Own StreamTransformer class and StreamTransformer.fromHandlers method

Home   »   Own StreamTransformer class and StreamTransformer.fromHandlers method

/*
* https://gist.github.com/web-architector/c0289dbd272b5256cf7c0e912113702b
* https://dartpad.dev/c0289dbd272b5256cf7c0e912113702b?null_safety=true
*/
import 'dart:async';
import 'dart:math' as math;
// import 'package:meta/meta.dart'/*uncomment on prod*/;

/// https://youtu.be/SZGG7uTBXJ4?t=8463
/// https://api.dart.dev/stable/2.10.5/dart-async/StreamTransformer-class.html
void main(List arguments) {
  Future(() {
    /// Вариант 1 через создание класса трансофрмера [MathStreamTransformer]
    final stream1 = Stream.periodic(const Duration(milliseconds: 500), (i) => i);
    stream1
        .skip(3)
        .take(3)
        // .transform(const MathStreamTransformer())/*or use extension math method below*/
        .math() /*either this method or use transform method above*/
        .forEach((element) {
      print(element);
    });
  });

  Future(() {
    /// Вариант 2 без созданя класса трансформера, с прямой трасформцацией через [StreamTransformer.fromHandlers]
    final stream2 = Stream.periodic(const Duration(milliseconds: 500), (i) => i);
    // Uncomment below for testing handleError callback
    // final stream2 = Stream.error(UnsupportedError('Error for testing handleError callback'));
    stream2
        .skip(3)
        .take(3)
        .transform(StreamTransformer.fromHandlers(
          handleData: (int data, sink) {
            sink
              ..add('y = $data')
              ..add('y * 2 = ${data * 2}')
              ..add('y * 3 = ${data * 3}')
              ..add('y ^ 2 = ${math.pow(data, 2)}')
              ..add('===============================');
          },
          handleError: /*логика обработки ошибок в стриме*/ (error, stackTrace, sink) {
            sink
              ..add(error.toString()) /*тесктовое описание ошибки*/
              ..addError(
                  error, stackTrace) /*или добавить в стрим как объект ошибки - программа terminate with exit code 255*/
              ..add('y = undefined') /*или замена ошибки на дргугие данные*/;
          },
        ))
        .forEach(print);
  });
}

// @immutable /*uncomment on prod*/
class MathStreamTransformer extends StreamTransformerBase {
  const MathStreamTransformer();

  /// Transforms the provided stream.
  /// Returns a new stream with events that are computed from events of the provided stream.
  @override
  Stream bind(Stream stream) {
    StreamSubscription? sub;

    // создаем конроллер нового стрима для выходных событий
    final controller = StreamController(
      /*управляем созданным стримом в зависимости от состаяния входного стрима*/
      onPause: () => sub?.pause(),
      onResume: () => sub?.resume(),
      onCancel: () => sub?.cancel(),

      /// If [sync] is true, the returned stream controller is a
      /// [SynchronousStreamController], and must be used with the care
      /// and attention necessary to not break the [Stream] contract. If in doubt,
      /// use the non-sync version.
      sync: false,
    );
    // слушаем входной стрим и трасформируем его события в события выходного стрима
    sub = stream.listen((event) {
      controller
        ..add('x = $event')
        ..add('x * 2 = ${event * 2}')
        ..add('x * 3 = ${event * 3}')
        ..add('x ^ 2 = ${math.pow(event, 2)}')
        ..add('===============================');
    });
    return stream.isBroadcast ? controller.stream.asBroadcastStream() : controller.stream;
  }
}

/// sourceStream.math()
extension MathX on Stream {
  Stream math() => transform(const MathStreamTransformer());
}

Leave a Reply

Your email address will not be published. Required fields are marked *