Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable.delay(delayErrors=true) drops onNext signals at random when disposed of concurrently #7851

Open
joepembe opened this issue Apr 3, 2025 · 3 comments

Comments

@joepembe
Copy link

joepembe commented Apr 3, 2025

When a subscription to an Observable.delay(delayErrors=true) operator is disposed of concurrently to the subscription's emissions, the onNext signals it issues to its downstream can be dropped at random.

See the following repro:

Scheduler scheduler = Schedulers.computation();
int maxValue = 10;
int maxAttempts = 1000;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
    ConcurrentLinkedDeque<Notification<Integer>> sink = new ConcurrentLinkedDeque<>();
    SequentialDisposable firstDisposable = new SequentialDisposable();
    CountDownLatch latch = new CountDownLatch(1);

    Observable<Integer> integers = Observable.range(0, maxValue);
    Observable<Notification<Integer>> first = integers.delay(5, TimeUnit.MICROSECONDS, scheduler, true)
            .materialize()
            .doOnNext(sink::add)
            .doOnSubscribe(firstDisposable::replace);
    Observable<?> second = integers.delay(5, TimeUnit.MICROSECONDS, scheduler)
            .doOnNext($ -> firstDisposable.dispose())
            .doOnTerminate(latch::countDown);

    first.subscribe();
    second.subscribe();
    latch.await();

    boolean foundTermination = false;
    int nextExpectedValue = 0;
    List<String> errors = new ArrayList<>();
    List<Notification<Integer>> notifications = List.copyOf(sink);
    for (int i = 0; i < notifications.size(); i++) {
        Notification<Integer> notification = notifications.get(i);
        if (notification.isOnComplete() || notification.isOnError()) {
            foundTermination = true;
        }
        if (notification.isOnNext()) {
            if (foundTermination) {
                errors.add("Found onNext after termination at index " + i);
            }
            int value = notification.getValue();
            if (value != nextExpectedValue) {
                errors.add((errors.size() + 1) + ". Wrong value at index " + i + " (expected " + nextExpectedValue +
                        ", was " + value + ")");
            }
            nextExpectedValue = value + 1;
        }
    }
    if (!errors.isEmpty()) {
        throw new AssertionError(new StringBuilder()
                .append("Attempt ")
                .append(attempt)
                .append(" failed.\n\t\tError(s):\n\t\t\t")
                .append(String.join("\n\t\t\t", errors))
                .append("\n\t\tNotifications: ")
                .append(notifications)
                .toString());
    }
}

Here is an example failure:

java.lang.AssertionError: Attempt 11 failed.
            Error(s):
        1. Wrong value at index 2 (expected 2, was 3)
        2. Wrong value at index 3 (expected 4, was 5)
        3. Wrong value at index 6 (expected 8, was 9)
        Notifications: [OnNextNotification[0], OnNextNotification[1], OnNextNotification[3], OnNextNotification[5], OnNextNotification[6], OnNextNotification[7], OnNextNotification[9]]

I understand that a subscription that is actively producing signals may continue to propagate those signals to the downstream when disposed of concurrently (i.e. disposal is "eventually consistent"). But I would expect that any signals emitted during or after the race would preserve their original ordering. In other words, once delay() has caught up to its newly-disposed state and does not emit its Nth onNext signal, it seems wrong for it to go on to emit its N+1th signal.

@akarnokd
Copy link
Member

akarnokd commented Apr 3, 2025

Thanks for the report. I'll look into it in the coming days.

Until then, try using .onTerminateDetach after delay to force the sequence to disconnect its onNext call chains.

@joepembe
Copy link
Author

joepembe commented Apr 3, 2025

Here is a better repro:

for (int attempt = 1; attempt <= 1000; attempt++) {
    ConcurrentLinkedDeque<Integer> sink = new ConcurrentLinkedDeque<>();
    SequentialDisposable disposable = new SequentialDisposable();
    disposable.replace(Observable.range(0, 10)
            .delay(1, TimeUnit.MICROSECONDS, Schedulers.computation(), true)
            .doOnNext(value -> {
                if (!disposable.isDisposed()) {
                    Schedulers.computation().scheduleDirect(disposable::dispose);
                }
                sink.add(value);
            })
            .subscribe());
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1));
    List<Integer> values = List.copyOf(sink);
    List<String> errors = new ArrayList<>();
    int expected = 0;
    for (int i = 0; i < values.size(); i++) {
        int value = values.get(i);
        if (value != expected) {
            errors.add((errors.size() + 1) + ". Wrong value at index " + i + " (expected " + expected +
                    ", was " + value + ")");
        }
        expected = value + 1;
    }
    if (!errors.isEmpty()) {
        throw new AssertionError(new StringBuilder()
                .append("Attempt ")
                .append(attempt)
                .append(" failed.\n\t\tError(s):\n\t\t\t")
                .append(String.join("\n\t\t\t", errors))
                .append("\n\t\tValues: ")
                .append(values)
                .toString());
    }
}

On my Macbook M2 Max, this seems to reliably reproduce the problematic behavior within the first couple attempts.

@akarnokd
Copy link
Member

akarnokd commented Apr 5, 2025

The problem is caused by the race between the individual event emissions and the code that cancels each of the associated tasks, resulting in some of them still running.

As a quick workaround, you can use onTerminateDetach to ensure the sequence ends and no random onNext event can get through:

disposable.replace(Observable.range(0, 10)
                    .delay(1, TimeUnit.MICROSECONDS, Schedulers.computation(), true)
                    .onTerminateDetach() // <---------------------------------------------------

I'll work out a more direct fix in a few days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants