Skip to content

Observando Streams y Futures

Ya aprendiste a observar datos síncronos. Ahora manejemos datos async con Streams y Futures.

¿Por Qué Funciones Especiales?

Streams y Futures son diferentes de Listenable:

  • Stream - Emite múltiples valores a lo largo del tiempo
  • Future - Se completa una vez con un valor
  • Ambos tienen estados de carga/error

watch_it proporciona watchStream() y watchFuture() - como StreamBuilder y FutureBuilder, pero en una línea.

watchStream - Streams Reactivos

Reemplaza StreamBuilder con watchStream():

dart
class ChatWidget extends WatchingWidget {
  @override
  Widget build(BuildContext context) {
    // One line instead of StreamBuilder!
    final snapshot = watchStream(
      (ChatService s) => s.messageStream,
      initialValue: 'Waiting for messages...',
    );

    return Text(snapshot.data ?? 'No data');
  }
}

Manejando Estados de Stream

dart
class UserActivity extends WatchingWidget {
  @override
  Widget build(BuildContext context) {
    final snapshot = watchStream(
      (UserService s) => s.activityStream,
      initialValue: 'No activity',
    );

    // Check state like StreamBuilder
    if (snapshot.connectionState == ConnectionState.waiting) {
      return CircularProgressIndicator();
    }

    if (snapshot.hasError) {
      return Text('Error: ${snapshot.error}');
    }

    return Text('Activity: ${snapshot.data}');
  }
}

AsyncSnapshot y Null Safety

Cuando proporcionas un initialValue no-null y usas un tipo de stream no-nullable (como Stream<String>), AsyncSnapshot.data no será null. Comienza con tu valor inicial y se actualiza con eventos del stream:

dart
class MessageWidget extends WatchingWidget {
  const MessageWidget({super.key});

  @override
  Widget build(BuildContext context) {
    // Stream<String> - non-nullable type
    final snapshot = watchStream(
      (ChatService s) => s.messageStream,
      initialValue: 'Waiting for messages...',
    );

    // Safe to use ! because:
    // 1. We provided a non-null initialValue
    // 2. Stream type is non-nullable (String, not String?)
    return Column(
      children: [
        Text(snapshot.data!),
        if (snapshot.connectionState == ConnectionState.waiting)
          Text('(connecting...)', style: TextStyle(fontSize: 12)),
      ],
    );
  }
}

Nota: Si tu tipo de stream es nullable (como Stream<String?>), entonces los eventos del stream pueden emitir valores null, haciendo que snapshot.data sea null incluso con un initialValue no-null.

Comparar con StreamBuilder

Sin watch_it:

dart
class UserActivity extends StatelessWidget {
  const UserActivity({super.key});

  @override
  Widget build(BuildContext context) {
    return StreamBuilder(
      stream: di<UserService>().activityStream,
      initialData: 'No activity',
      builder: (context, snapshot) {
        if (snapshot.connectionState == ConnectionState.waiting) {
          return CircularProgressIndicator();
        }
        if (snapshot.hasError) {
          return Text('Error: ${snapshot.error}');
        }
        return Text('Activity: ${snapshot.data}');
      },
    );
  }
}

¡Mucho más anidado y verboso!

Uso Avanzado de watchStream

Observar Streams Locales (parámetro target)

Si tu stream no está registrado en get_it, usa el parámetro target:

dart
class TimerWidget extends WatchingWidget {
  const TimerWidget({super.key, required this.timerStream});

  final Stream<int> timerStream;

  @override
  Widget build(BuildContext context) {
    // Watch a stream passed as parameter (not from get_it)
    final snapshot = watchStream(
      null, // No selector needed
      target: timerStream, // Watch this stream directly
      initialValue: 0,
    );

    return Text('Seconds: ${snapshot.data}');
  }
}

Cuándo usar:

  • Stream pasado como parámetro de widget
  • Streams creados localmente
  • Streams de packages externos

Permitir Cambios de Stream (allowStreamChange)

Por defecto, watchStream se comporta diferente dependiendo de cómo proporciones el stream:

  • Con función select: Llama al selector una vez para prevenir crear múltiples streams en cada reconstrucción
  • Con parámetro target: Lanza error si la instancia del stream cambia entre reconstrucciones

Establece allowStreamChange: true si esperas que el stream cambie legítimamente entre reconstrucciones:

dart
class ChatRoomWidget extends WatchingWidget {
  const ChatRoomWidget({super.key, required this.selectedRoomId});

  final String selectedRoomId;

  @override
  Widget build(BuildContext context) {
    final chatService = createOnce(() => ChatRoomService());

    // Stream changes when selectedRoomId changes
    final snapshot = watchStream(
      null,
      target: chatService.getRoomStream(selectedRoomId),
      initialValue: 'No messages yet',
      allowStreamChange: true, // Allow switching between room streams
    );

    return Column(
      children: [
        Text('Room: $selectedRoomId'),
        Text('Last message: ${snapshot.data}'),
      ],
    );
  }
}

Qué sucede con allowStreamChange: true:

  • La función selectora se llama y evalúa en cada construcción
  • Si la instancia del stream cambió, watchStream automáticamente se desuscribe del stream antiguo
  • Se suscribe al nuevo stream
  • El widget se reconstruye con datos del nuevo stream

Cuándo usar:

  • El stream depende de parámetros reactivos (como ID de sala seleccionada)
  • Cambiar entre diferentes streams basados en entrada del usuario
  • Importante: Solo usa cuando el stream debería realmente cambiar, no cuando accidentalmente se recrea el mismo stream

Firma Completa del Método

dart
AsyncSnapshot<R> watchStream<T extends Object, R>(
  Stream<R> Function(T)? select, {
  T? target,
  R? initialValue,
  bool preserveState = true,
  bool allowStreamChange = false,
  String? instanceName,
  GetIt? getIt,
})

Todos los parámetros:

  • select - Función para obtener Stream del objeto registrado (opcional si usas target)
  • target - Stream directo a observar (opcional, no desde get_it)
  • initialValue - Valor mostrado antes del primer evento del stream (hace que data nunca sea null)
  • preserveState - Mantener último valor cuando el stream cambia (predeterminado: true)
  • allowStreamChange - Permitir que la instancia del stream cambie (predeterminado: false)
  • instanceName - Para registros con nombre
  • getIt - Instancia GetIt personalizada (raramente necesario)

watchFuture - Futures Reactivos

Reemplaza FutureBuilder con watchFuture():

dart
class DataWidget extends WatchingWidget {
  @override
  Widget build(BuildContext context) {
    final snapshot = watchFuture(
      (DataService s) => s.fetchTodos(),
      initialValue: [],
    );

    if (snapshot.connectionState == ConnectionState.waiting) {
      return CircularProgressIndicator();
    }

    if (snapshot.hasError) {
      return Text('Error: ${snapshot.error}');
    }

    return Text('Data: ${snapshot.data?.length} items');
  }
}

AsyncSnapshot y Null Safety

Al igual que watchStream, cuando proporcionas un initialValue no-null a watchFuture con un tipo de future no-nullable (como Future<String>), AsyncSnapshot.data no será null. Ver el tip de AsyncSnapshot arriba para detalles.

Patrón Común: Inicialización de App

dart
class SplashScreen extends WatchingWidget {
  const SplashScreen({super.key});

  @override
  Widget build(BuildContext context) {
    // Selector function called once - starts initialization automatically
    final snapshot = watchFuture(
      (AppService s) => s.initialize(),
      initialValue: false,
    );

    if (snapshot.connectionState == ConnectionState.waiting) {
      return Column(
        children: [
          CircularProgressIndicator(),
          Text('Initializing...'),
        ],
      );
    }

    if (snapshot.hasError) {
      return ErrorScreen(error: snapshot.error);
    }

    // Initialization complete - navigate
    callOnce((_) {
      Navigator.pushReplacement(
        context,
        MaterialPageRoute(builder: (_) => HomeScreen()),
      );
    });

    return Container(); // Brief moment before navigation
  }
}

Avanzado: Esperar Múltiples Dependencias

Si necesitas esperar que múltiples servicios async se inicialicen (como database, auth, config), usa allReady() en lugar de futures individuales. Ver Inicialización Async con allReady para más detalles.

Uso Avanzado de watchFuture

Permitir Cambios de Future (allowFutureChange)

Por defecto, watchFuture se comporta diferente dependiendo de cómo proporciones el future:

  • Con función select: Llama al selector una vez para prevenir crear múltiples futures en cada reconstrucción
  • Con parámetro target: Lanza error si la instancia del future cambia entre reconstrucciones

Establece allowFutureChange: true si esperas que el future cambie legítimamente entre reconstrucciones (como operaciones de reintentar):

dart
class DataWidget extends WatchingWidget {
  const DataWidget({super.key});

  @override
  Widget build(BuildContext context) {
    // Create retry counter
    final retryCount = createOnce(() => ValueNotifier<int>(0));

    // Watch the future - future changes when retryCount changes
    final snapshot = watchFuture(
      (DataService s) => s.fetchTodos(),
      initialValue: null,
      allowFutureChange: true, // Allow new future on retry
    );

    return Column(
      children: [
        if (snapshot.data == null && !snapshot.hasError)
          CircularProgressIndicator()
        else if (snapshot.hasError)
          Column(
            children: [
              Text('Error: ${snapshot.error}'),
              ElevatedButton(
                // Trigger new future by changing retryCount
                onPressed: () => retryCount.value++,
                child: Text('Retry'),
              ),
            ],
          )
        else
          Text('Loaded ${snapshot.data!.length} items'),
      ],
    );
  }
}

Qué sucede con allowFutureChange: true:

  • La función selectora se llama y evalúa en cada construcción
  • Si la instancia del future cambió, watchFuture comienza a observar el nuevo future
  • El widget se reconstruye cuando el nuevo future se completa
  • La completación del future anterior se ignora

Cuándo usar:

  • Funcionalidad de reintentar para requests fallidas
  • El future depende de parámetros reactivos que cambian
  • Importante: Solo usa cuando el future debería realmente cambiar, no cuando accidentalmente se recrea el mismo future

Firma Completa del Método

dart
AsyncSnapshot<R> watchFuture<T extends Object, R>(
  Future<R> Function(T)? select, {
  T? target,
  required R initialValue,
  bool preserveState = true,
  bool allowFutureChange = false,
  String? instanceName,
  GetIt? getIt,
})

Todos los parámetros:

  • select - Función para obtener Future del objeto registrado (opcional si usas target)
  • target - Future directo a observar (opcional, no desde get_it)
  • initialValue - Requerido. Valor mostrado antes de que el future se complete (hace que data nunca sea null)
  • preserveState - Mantener último valor cuando el future cambia (predeterminado: true)
  • allowFutureChange - Permitir que la instancia del future cambie (predeterminado: false)
  • instanceName - Para registros con nombre
  • getIt - Instancia GetIt personalizada (raramente necesario)

Múltiples Fuentes Async

Observa múltiples streams o futures:

dart
class Dashboard extends WatchingWidget {
  @override
  Widget build(BuildContext context) {
    final messages = watchStream(
      (MessageService s) => s.messageStream,
      initialValue: <Message>[],
    );

    final notifications = watchStream(
      (NotificationService s) => s.notificationStream,
      initialValue: 0,
    );

    return Column(
      children: [
        Text('Messages: ${messages.data?.length ?? 0}'),
        Text('Notifications: ${notifications.data}'),
      ],
    );
  }
}

Mezclar Sync y Async

Combina datos síncronos y asíncronos:

dart
class UserProfile extends WatchingWidget {
  const UserProfile({super.key});

  @override
  Widget build(BuildContext context) {
    // Synchronous data
    final userId = watchValue((SimpleUserManager m) => m.name);

    // Asynchronous data - fetch stats from API based on current userId
    final statsSnapshot = watchFuture(
      (ApiClient api) => api.get('/users/$userId/stats'),
      initialValue: null,
    );

    return Column(
      children: [
        Text('User: $userId'),
        if (statsSnapshot.data == null)
          CircularProgressIndicator()
        else if (statsSnapshot.hasError)
          Text('Error loading stats')
        else
          Text('Stats: ${statsSnapshot.data!['data']}'),
      ],
    );
  }
}

Guía Rápida de AsyncSnapshot

Tanto watchStream() como watchFuture() retornan AsyncSnapshot<T>:

dart
// Check connection state
snapshot.connectionState == ConnectionState.waiting;
snapshot.connectionState == ConnectionState.done;

// Check for data/errors
snapshot.hasData; // true if data available
snapshot.hasError; // true if error occurred

// Access data/error
snapshot.data; // The value (T?)
snapshot.error; // The error if any

Patrones Comunes

Patrón 1: Carga Simple

dart
final snapshot = watchFuture(
  (DataService s) => s.fetchTodos(),
  initialValue: null,
);

if (snapshot.connectionState == ConnectionState.waiting) {
  return CircularProgressIndicator();
}

return Text('Data loaded: ${snapshot.data?.length} items');

Patrón 2: Manejo de Errores

dart
final snapshot = watchStream(
  (MessageService s) => s.messageStream,
  initialValue: <Message>[],
);

if (snapshot.hasError) {
  return Column(
    children: [
      Text('Error: ${snapshot.error}'),
      ElevatedButton(
        onPressed: () {}, // Retry logic
        child: Text('Retry'),
      ),
    ],
  );
}

return ListView(children: snapshot.data!.map((m) => Text(m.text)).toList());

¡No Más Builders Anidados!

Antes:

dart
return FutureBuilder(
  future: initFuture,
  builder: (context, futureSnapshot) {
    if (futureSnapshot.connectionState == ConnectionState.waiting) {
      return CircularProgressIndicator();
    }

    return StreamBuilder(
      stream: dataStream,
      builder: (context, streamSnapshot) {
        if (streamSnapshot.connectionState == ConnectionState.waiting) {
          return CircularProgressIndicator();
        }
        return Text(streamSnapshot.data!);
      },
    );
  },
);

Después:

dart
final initSnapshot = watchFuture(
  (AppService s) => s.initialize(),
  initialValue: false,
);

final dataSnapshot = watchStream(
  (ChatService s) => s.messageStream,
  initialValue: '',
);

if (initSnapshot.connectionState == ConnectionState.waiting ||
    dataSnapshot.connectionState == ConnectionState.waiting) {
  return CircularProgressIndicator();
}

return Text(dataSnapshot.data!);

¡Código plano y legible!

Puntos Clave

watchStream() reemplaza StreamBuilder - sin anidación ✅ watchFuture() reemplaza FutureBuilder - mismo beneficio ✅ Ambos retornan AsyncSnapshot<T> - misma API que conoces ✅ Suscripción y limpieza automáticas ✅ Combina datos sync y async fácilmente

Siguiente: Aprende sobre efectos secundarios con handlers.

Ver También

Publicado bajo la Licencia MIT.