import {catchError, EMPTY, finalize, mergeMap, Observable, Subject, tap} from "rxjs";
import {action, computed, makeObservable, observable, reaction} from "mobx";

export class PipeManager<T, R = any> {
    private readonly pipe = new Subject<[number, T]>()
    readonly inPipe = new Set<number>();

    constructor(process: (o: T) => Observable<R>) {
        makeObservable(this, {
            process: false,
            inPipe: observable,
            setInPipe: action,
            hasInPipe: computed,
            nInPipe: computed,
        })
        this.pipe.pipe(
            tap(([key, _]) => this.setInPipe(key, true)),
            mergeMap(([key, object]) => process(object).pipe(
                finalize(() => this.setInPipe(key, false)),
                catchError((err) => {
                    console.error('Cannot process element', key, JSON.stringify(object), err);
                    return EMPTY; // TODO: Currently we fail silently
                }),
            ))
        ).subscribe()
    }

    process(key: number, object: T) {
        this.pipe.next([key, object]);
    }

    setInPipe(key: number, inPipe: boolean) {
        if (inPipe) {
            this.inPipe.add(key);
        } else {
            this.inPipe.delete(key);
        }
    }

    get hasInPipe() {
        return this.inPipe.size !== 0;
    }

    get nInPipe(): number {
        return this.inPipe.size;
    }
}

export class PipeManager2<T, R = any> {
    private inc = 0;
    public readonly m: PipeManager<T, R>;

    constructor(process: (o: T) => Observable<R>, private onEmpty: Subject<void>) {
        this.m = new PipeManager<T, R>(process);
        reaction(() => this.m.nInPipe, (next, prev) => {
            console.log('PipeManager2', prev, next);
            if (next === 0 && prev > 0) {
                // We went to zero
                onEmpty.next();
            }
        })
    }

    processNew(object: T) {
        this.m.process(this.inc++, object);
    }
}
