Implementacja coroutines w Javie

To pytanie jest związane z moim pytaniem o istniejące implementacje coroutine w Javie. Jeśli, jak podejrzewam, okaże się, że w Javie nie ma pełnej implementacji coroutines, to co by było potrzebne do ich implementacji?

Jak już powiedziałem w tym pytaniu, wiem o co chodzi:

  1. możesz zaimplementować "coroutines" jako wątki/pule wątków za kulisami.
  2. możesz zrobić tricksy rzeczy z JVM bytecode za kulisami, aby koroutines możliwe.
  3. tak zwana "Maszyna Da Vinci" implementacja JVM ma prymitywy, które sprawiają, że coroutines są wykonalne bez manipulacja kodem bajtowym.
  4. istnieją również różne podejścia oparte na JNI do korutyn.

Zajmę się po kolei brakami każdego z nich.

Koroutiny oparte na wątkach

To "rozwiązanie" jest patologiczne. Głównym celem coroutines jest unikanie narzutu wątków, blokowania, szeregowania jądra itp. Coroutines mają być lekkie i szybkie oraz działać tylko w przestrzeni użytkownika. Wdrożenie ich w zakresie gwintów full-tilt z ciasnymi ograniczeniami pozbawia wszystkich zalet.

JVM bytecode manipulation

To rozwiązanie jest bardziej praktyczne, choć trochę trudne do wykonania. Jest to mniej więcej to samo, co przeskakiwanie do języka asemblacji dla bibliotek coroutine w C (czyli ile z nich działa) z tą zaletą, że masz tylko jedną architekturę do zmartwień na prawo i prawo.

Wiąże się to również z uruchamianiem kodu tylko na stosach JVM w pełni zgodnych (co oznacza, na przykład, brak Androida), chyba że znajdziesz sposób, aby zrobić to samo na stosie niezgodnym. Jeśli jednak znajdziesz sposób, aby to zrobić, teraz podwoiłeś złożoność systemu i potrzeby testowania.

Maszyna Da Vinci

Maszyna Da Vinci jest fajna do eksperymentowania, ale ponieważ nie jest to standardowy JVM, jego funkcje nie będą dostępne wszędzie. Podejrzewam, że większość środowisk produkcyjnych zabraniałaby używania maszyny Da Vinci. W ten sposób mógłbym użyć tego do fajnych eksperymentów, ale nie dla żadnego kodu, który spodziewam się wydać w prawdziwym świecie.

Ma również dodany problem podobny do powyższego rozwiązania manipulacji kodem bajtowym JVM: nie będzie działać na alternatywnych stosach (takich jak Android).

Implementacja JNI

To rozwiązanie sprawia, że sens robienia tego w Javie jest zupełnie dyskusyjny. Każda kombinacja procesora i systemu operacyjnego wymaga niezależnych testów i każda jest punktem potencjalnie frustrującej subtelnej awarii. Alternatywnie, oczywiście, mógłbym związać się do jednej platformy całkowicie, ale to też sprawia, że sens robienia rzeczy w Javie jest całkowicie dyskusyjny.

Więc...

Czy Jest jakiś sposób na zaimplementowanie corutines w Javie bez użycia jednej z tych czterech technik? Czy też będę zmuszony użyć jednego z tych czterech, które najmniej pachną (manipulacja JVM) zamiast tego?


Edited to add:

Aby upewnić się, że zamieszanie jest opanowane, jest to powiązane pytanie do mojego drugiego , ale nie to samo. Ten szuka istniejącego wdrożenia w celu uniknięcia niepotrzebnego wymyślania koła. To jest pytanie dotyczące tego, w jaki sposób można by wdrożyć coroutines w Javie, gdyby drugie okazało się bez odpowiedzi. Intencją jest utrzymanie różnych pytań na różnych nici.

Author: Community, 2010-05-17

6 answers

Rzuciłbym na to okiem: http://www.chiark.greenend.org.uk / ~sgtatham/coroutines.html , jest dość interesujący i powinien zapewnić dobre miejsce do rozpoczęcia. Ale oczywiście używamy Javy, więc możemy zrobić lepiej (a może gorzej, bo nie ma makr:))

Z mojego zrozumienia z koroutine zwykle masz producent i konsument koroutine (lub przynajmniej jest to najczęstszy wzór). Ale semantycznie nie chcesz, aby producent zadzwoń do konsumenta lub visa-versa, ponieważ wprowadza to asymetrię. Ale biorąc pod uwagę sposób, w jaki działają języki oparte na stosie, będziemy musieli poprosić kogoś o wywołanie.

Oto bardzo prosta hierarchia typów:

public interface CoroutineProducer<T>
{
    public T Produce();
    public boolean isDone();
}

public interface CoroutineConsumer<T>
{
    public void Consume(T t);
}

public class CoroutineManager
{
    public static Execute<T>(CoroutineProducer<T> prod, CoroutineConsumer<T> con)
    {
        while(!prod.IsDone()) // really simple
        {
            T d = prod.Produce();
            con.Consume(d);
        }
    }
}

Teraz oczywiście najtrudniejszą częścią jest implementacja interfejsów, w szczególności trudno jest rozbić obliczenia na poszczególne etapy. Do tego prawdopodobnie potrzebujesz całego innego zestawu trwałych struktur kontrolnych. Podstawową ideą jest że chcemy symulować nielokalny transfer kontroli(w końcu to tak jak symulujemy goto). Zasadniczo chcemy odejść od używania stosu i pc (program-counter), utrzymując stan naszych bieżących operacji w stercie zamiast na stosie. Dlatego będziemy potrzebować kilku klas pomocników.

Na przykład:

Powiedzmy, że w idealnym świecie chciałeś napisać konsumenta, który wyglądał tak (psuedocode):

boolean is_done;
int other_state;
while(!is_done)
{
    //read input
    //parse input
    //yield input to coroutine
    //update is_done and other_state;
}

We potrzebujemy abstrakcji zmiennej lokalnej jak is_done i other_state i musimy abstrakcji samej pętli while, ponieważ nasza operacja yield like nie będzie używać stosu. Stwórzmy więc abstrakcję pętli while I związane z nią klasy:

enum WhileState {BREAK, CONTINUE, YIELD}
abstract class WhileLoop<T>
{
    private boolean is_done;
    public boolean isDone() { return is_done;}
    private T rval;
    public T getReturnValue() {return rval;} 
    protected void setReturnValue(T val)
    {
        rval = val;
    }


    public T loop()
    {
        while(true)
        {
            WhileState state = execute();
            if(state == WhileState.YIELD)
                return getReturnValue();
            else if(state == WhileState.BREAK)
                    {
                       is_done = true;
                return null;
                    }
        }
    }
    protected abstract WhileState execute();
}

Podstawową sztuczką jest przeniesienie lokalnych zmiennych na zmienne klasy i przekształcenie bloków zakresu w klasy, co daje nam możliwość "ponownego wprowadzenia" naszej "pętli" po uzyskaniu zwracanej wartości.

Teraz do wdrożenia nasz producent

public class SampleProducer : CoroutineProducer<Object>
{
    private WhileLoop<Object> loop;//our control structures become state!!
    public SampleProducer()
    {
        loop = new WhileLoop()
        {
            private int other_state;//our local variables become state of the control structure
            protected WhileState execute() 
            {
                //this implements a single iteration of the loop
                if(is_done) return WhileState.BREAK;
                //read input
                //parse input
                Object calcluated_value = ...;
                //update is_done, figure out if we want to continue
                setReturnValue(calculated_value);
                return WhileState.YIELD;
            }
        };
    }
    public Object Produce()
    {
        Object val = loop.loop();
        return val;
    }
    public boolean isDone()
    {
        //we are done when the loop has exited
        return loop.isDone();
    }
}

Podobne sztuczki można wykonać dla innych podstawowych struktur przepływu sterowania. Najlepiej byłoby zbudować bibliotekę tych klas pomocniczych, a następnie użyć ich do zaimplementowania tych prostych interfejsów, które ostatecznie dadzą ci semantykę wspólnych procedur. Jestem pewien, że wszystko, co tu napisałem, może być uogólnione i znacznie rozszerzone.

 26
Author: luke,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2010-05-17 16:41:53

Proponuję spojrzeć na koroutiny Kotlina na JVM . Należy jednak do innej kategorii. Nie ma manipulacji kodem bajtowym i działa również na Androidzie. Jednak będziesz musiał napisać swoje coroutines w Kotlinie. Plusem jest to, że Kotlin został zaprojektowany z myślą o interoperacyjności z Javą, więc możesz nadal używać wszystkich bibliotek Javy i swobodnie łączyć Kod Kotlin i Java w tym samym projekcie, nawet umieszczając je obok siebie w tych samych katalogach i paczki.

Ten Przewodnik po kotlinx.coroutines dostarcza o wiele więcej przykładów, podczas gdy dokument coroutines design wyjaśnia wszystkie motywacje, przypadki użycia i szczegóły implementacji.

 6
Author: Roman Elizarov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-04-05 10:10:49

Właśnie natknąłem się na to pytanie i chcę tylko wspomnieć, że myślę, że możliwe jest zaimplementowanie corutynów lub generatorów w podobny sposób, jak robi to C#. To powiedziawszy nie używam Javy, ale CIL ma dość podobne ograniczenia jak JVM.

Instrukcja yield W C# jest czystą cechą języka i nie jest częścią kodu bajtowego CIL. Kompilator C# tworzy po prostu ukrytą klasę prywatną dla każdej funkcji generatora. Jeśli użyjesz deklaracji wydajności w funkcji to musi zwrócić licznik lub liczbę. Kompilator "pakuje" Twój kod do klasy podobnej do statemachine.

Kompilator C# może użyć "goto" w wygenerowanym kodzie, aby ułatwić konwersję do maszyny Stanów. Nie znam możliwości kodu bajtowego Javy i jeśli jest coś takiego jak zwykły skok bezwarunkowy, ale na" poziomie montażu " jest to zazwyczaj możliwe.

Jak już wspomniano ta funkcja musi być zaimplementowana w kompilatorze. Bo mam niewiele wiedza o Javie i jej kompilatorze Nie wiem czy da się zmienić / rozszerzyć kompilator, może za pomocą "preprocesora" czy jakoś tak.

Osobiście uwielbiam koroutiny. Jako deweloper Unity games używam ich dość często. Ponieważ gram dużo w Minecrafta z ComputerCraft byłem ciekaw dlaczego corutines w Lua (LuaJ) są zaimplementowane z wątkami.

 3
Author: Bunny83,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-04-20 12:13:48

Kotlin stosuje następujące podejście do Ko-procedur
(z https://kotlinlang.org/docs/reference/coroutines.html):

Coroutines są całkowicie zaimplementowane przez technikę kompilacji (nie jest wymagane wsparcie od strony VM lub OS), a zawieszenie działa poprzez transformację kodu. Zasadniczo każda funkcja zawieszająca (optymalizacje mogą mieć zastosowanie, ale nie będziemy tutaj wchodzić w to) jest przekształcana w maszynę stanową, w której Stany odpowiadają zawieszaniu wywołań. Tuż przed zawieszeniem, Następny stan jest przechowywany w polu generowanej przez kompilator klasy wraz z odpowiednimi zmiennymi lokalnymi itp. Po wznowieniu tego coroutine, zmienne lokalne są przywracane, a maszyna stanowa przechodzi ze stanu zaraz po zawieszeniu.

Zawieszony koroutin może być przechowywany i przekazywany jako obiekt, który zachowuje swój zawieszony stan i lokalnie. Typ takich obiektów jest kontynuacją, a ogólna transformacja kodu opisana tutaj odpowiada do klasycznego stylu kontynuowania-przechodzenia. W związku z tym funkcje zawieszające przyjmują pod maską dodatkowy parametr typu Continuation.

Sprawdź dokument projektowy na https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md

 1
Author: gooboo,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-04-06 04:39:11

Mam klasę Coroutine, której używam w Javie. Jest on oparty na wątkach i korzystanie z wątków ma tę zaletę, że umożliwia pracę równoległą, co na maszynach wielordzeniowych może być zaletą. Dlatego warto rozważyć podejście oparte na wątkach.

 0
Author: Howard Lovatt,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-09-04 10:33:05

Jest inny wybór dla Java6 +

Implementacja koroutine Pythona:

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

class CorRunRAII {
    private final List<WeakReference<? extends CorRun>> resources = new ArrayList<>();

    public CorRunRAII add(CorRun resource) {
        if (resource == null) {
            return this;
        }
        resources.add(new WeakReference<>(resource));

        return this;
    }

    public CorRunRAII addAll(List<? extends CorRun> arrayList) {
        if (arrayList == null) {
            return this;
        }
        for (CorRun corRun : arrayList) {
            add(corRun);
        }

        return this;
    }

    @Override
    protected void finalize() throws Throwable {
        super.finalize();

        for (WeakReference<? extends CorRun> corRunWeakReference : resources) {
            CorRun corRun = corRunWeakReference.get();
            if (corRun != null) {
                corRun.stop();
            }
        }
    }
}

class CorRunYieldReturn<ReceiveType, YieldReturnType> {
    public final AtomicReference<ReceiveType> receiveValue;
    public final LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue;

    CorRunYieldReturn(AtomicReference<ReceiveType> receiveValue, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) {
        this.receiveValue = receiveValue;
        this.yieldReturnValue = yieldReturnValue;
    }
}

interface CorRun<ReceiveType, YieldReturnType> extends Runnable, Callable<YieldReturnType> {
    boolean start();
    void stop();
    void stop(final Throwable throwable);
    boolean isStarted();
    boolean isEnded();
    Throwable getError();

    ReceiveType getReceiveValue();
    void setResultForOuter(YieldReturnType resultForOuter);
    YieldReturnType getResultForOuter();

    YieldReturnType receive(ReceiveType value);
    ReceiveType yield();
    ReceiveType yield(YieldReturnType value);
    <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another);
    <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value);
}

abstract class CorRunSync<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> {

    private ReceiveType receiveValue;
    public final List<WeakReference<CorRun>> potentialChildrenCoroutineList = new ArrayList<>();

    // Outside

    private AtomicBoolean isStarted = new AtomicBoolean(false);
    private AtomicBoolean isEnded = new AtomicBoolean(false);
    private Throwable error;

    private YieldReturnType resultForOuter;

    @Override
    public boolean start() {

        boolean isStarted = this.isStarted.getAndSet(true);
        if ((! isStarted)
                && (! isEnded())) {
            receive(null);
        }

        return isStarted;
    }

    @Override
    public void stop() {
        stop(null);
    }

    @Override
    public void stop(Throwable throwable) {
        isEnded.set(true);
        if (throwable != null) {
            error = throwable;
        }

        for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) {
            CorRun child = weakReference.get();
            if (child != null) {
                child.stop();
            }
        }
    }

    @Override
    public boolean isStarted() {
        return isStarted.get();
    }

    @Override
    public boolean isEnded() {
        return isEnded.get();
    }

    @Override
    public Throwable getError() {
        return error;
    }

    @Override
    public ReceiveType getReceiveValue() {
        return receiveValue;
    }

    @Override
    public void setResultForOuter(YieldReturnType resultForOuter) {
        this.resultForOuter = resultForOuter;
    }

    @Override
    public YieldReturnType getResultForOuter() {
        return resultForOuter;
    }

    @Override
    public synchronized YieldReturnType receive(ReceiveType value) {
        receiveValue = value;

        run();

        return getResultForOuter();
    }

    @Override
    public ReceiveType yield() {
        return yield(null);
    }

    @Override
    public ReceiveType yield(YieldReturnType value) {
        resultForOuter = value;
        return receiveValue;
    }

    @Override
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another) {
        return yieldFrom(another, null);
    }

    @Override
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another, TargetReceiveType value) {
        if (another == null || another.isEnded()) {
            throw new RuntimeException("Call null or isEnded coroutine");
        }

        potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another));

        synchronized (another) {
            boolean isStarted = another.start();
            boolean isJustStarting = ! isStarted;
            if (isJustStarting && another instanceof CorRunSync) {
                return another.getResultForOuter();
            }

            return another.receive(value);
        }
    }

    @Override
    public void run() {
        try {
            this.call();
        }
        catch (Exception e) {
            e.printStackTrace();

            stop(e);
            return;
        }
    }
}

abstract class CorRunThread<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> {

    private final ExecutorService childExecutorService = newExecutorService();
    private ExecutorService executingOnExecutorService;

    private static final CorRunYieldReturn DUMMY_COR_RUN_YIELD_RETURN = new CorRunYieldReturn(new AtomicReference<>(null), new LinkedBlockingDeque<AtomicReference>());

    private final CorRun<ReceiveType, YieldReturnType> self;
    public final List<WeakReference<CorRun>> potentialChildrenCoroutineList;
    private CorRunYieldReturn<ReceiveType, YieldReturnType> lastCorRunYieldReturn;

    private final LinkedBlockingDeque<CorRunYieldReturn<ReceiveType, YieldReturnType>> receiveQueue;

    // Outside

    private AtomicBoolean isStarted = new AtomicBoolean(false);
    private AtomicBoolean isEnded = new AtomicBoolean(false);
    private Future<YieldReturnType> future;
    private Throwable error;

    private final AtomicReference<YieldReturnType> resultForOuter = new AtomicReference<>();

    CorRunThread() {
        executingOnExecutorService = childExecutorService;

        receiveQueue = new LinkedBlockingDeque<>();
        potentialChildrenCoroutineList = new ArrayList<>();

        self = this;
    }

    @Override
    public void run() {
        try {
            self.call();
        }
        catch (Exception e) {
            stop(e);
            return;
        }

        stop();
    }

    @Override
    public abstract YieldReturnType call();

    @Override
    public boolean start() {
        return start(childExecutorService);
    }

    protected boolean start(ExecutorService executorService) {
        boolean isStarted = this.isStarted.getAndSet(true);
        if (!isStarted) {
            executingOnExecutorService = executorService;
            future = (Future<YieldReturnType>) executingOnExecutorService.submit((Runnable) self);
        }
        return isStarted;
    }

    @Override
    public void stop() {
        stop(null);
    }

    @Override
    public void stop(final Throwable throwable) {
        if (throwable != null) {
            error = throwable;
        }
        isEnded.set(true);

        returnYieldValue(null);
        // Do this for making sure the coroutine has checked isEnd() after getting a dummy value
        receiveQueue.offer(DUMMY_COR_RUN_YIELD_RETURN);

        for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) {
            CorRun child = weakReference.get();
            if (child != null) {
                if (child instanceof CorRunThread) {
                    ((CorRunThread)child).tryStop(childExecutorService);
                }
            }
        }

        childExecutorService.shutdownNow();
    }

    protected void tryStop(ExecutorService executorService) {
        if (this.executingOnExecutorService == executorService) {
            stop();
        }
    }

    @Override
    public boolean isEnded() {
        return isEnded.get() || (
                future != null && (future.isCancelled() || future.isDone())
                );
    }

    @Override
    public boolean isStarted() {
        return isStarted.get();
    }

    public Future<YieldReturnType> getFuture() {
        return future;
    }

    @Override
    public Throwable getError() {
        return error;
    }

    @Override
    public void setResultForOuter(YieldReturnType resultForOuter) {
        this.resultForOuter.set(resultForOuter);
    }

    @Override
    public YieldReturnType getResultForOuter() {
        return this.resultForOuter.get();
    }

    @Override
    public YieldReturnType receive(ReceiveType value) {

        LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue = new LinkedBlockingDeque<>();

        offerReceiveValue(value, yieldReturnValue);

        try {
            AtomicReference<YieldReturnType> takeValue = yieldReturnValue.take();
            return takeValue == null ? null : takeValue.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return null;
    }

    @Override
    public ReceiveType yield() {
        return yield(null);
    }

    @Override
    public ReceiveType yield(final YieldReturnType value) {
        returnYieldValue(value);

        return getReceiveValue();
    }

    @Override
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another) {
        return yieldFrom(another, null);
    }

    @Override
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value) {
        if (another == null || another.isEnded()) {
            throw new RuntimeException("Call null or isEnded coroutine");
        }

        boolean isStarted = false;
        potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another));

        synchronized (another) {
            if (another instanceof CorRunThread) {
                isStarted = ((CorRunThread)another).start(childExecutorService);
            }
            else {
                isStarted = another.start();
            }

            boolean isJustStarting = ! isStarted;
            if (isJustStarting && another instanceof CorRunSync) {
                return another.getResultForOuter();
            }

            TargetYieldReturnType send = another.receive(value);
            return send;
        }
    }

    @Override
    public ReceiveType getReceiveValue() {

        setLastCorRunYieldReturn(takeLastCorRunYieldReturn());

        return lastCorRunYieldReturn.receiveValue.get();
    }

    protected void returnYieldValue(final YieldReturnType value) {
        CorRunYieldReturn<ReceiveType, YieldReturnType> corRunYieldReturn = lastCorRunYieldReturn;
        if (corRunYieldReturn != null) {
            corRunYieldReturn.yieldReturnValue.offer(new AtomicReference<>(value));
        }
    }

    protected void offerReceiveValue(final ReceiveType value, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) {
        receiveQueue.offer(new CorRunYieldReturn(new AtomicReference<>(value), yieldReturnValue));
    }

    protected CorRunYieldReturn<ReceiveType, YieldReturnType> takeLastCorRunYieldReturn() {
        try {
            return receiveQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return null;
    }

    protected void setLastCorRunYieldReturn(CorRunYieldReturn<ReceiveType,YieldReturnType> lastCorRunYieldReturn) {
        this.lastCorRunYieldReturn = lastCorRunYieldReturn;
    }

    protected ExecutorService newExecutorService() {
        return Executors.newCachedThreadPool(getThreadFactory());
    }

    protected ThreadFactory getThreadFactory() {
        return new ThreadFactory() {
            @Override
            public Thread newThread(final Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                    @Override
                    public void uncaughtException(Thread thread, Throwable throwable) {
                        throwable.printStackTrace();
                        if (runnable instanceof CorRun) {
                            CorRun self = (CorRun) runnable;
                            self.stop(throwable);
                            thread.interrupt();
                        }
                    }
                });
                return thread;
            }
        };
    }
}

Teraz możesz używać korutin pythonicznych w ten sposób (np. liczby Fibonacciego)

Wersja Wątku:

class Fib extends CorRunThread<Integer, Integer> {

    @Override
    public Integer call() {
        Integer times = getReceiveValue();
        do {
            int a = 1, b = 1;
            for (int i = 0; times != null && i < times; i++) {
                int temp = a + b;
                a = b;
                b = temp;
            }
            // A pythonic "yield", i.e., it returns `a` to the caller and waits `times` value from the next caller
            times = yield(a);
        } while (! isEnded());

        setResultForOuter(Integer.MAX_VALUE);
        return getResultForOuter();
    }
}

class MainRun extends CorRunThread<String, String> {

    @Override
    public String call() {

        // The fib coroutine would be recycled by its parent
        // (no requirement to call its start() and stop() manually)
        // Otherwise, if you want to share its instance and start/stop it manually,
        // please start it before being called by yieldFrom() and stop it in the end.
        Fib fib = new Fib();
        String result = "";
        Integer current;
        int times = 10;
        for (int i = 0; i < times; i++) {

            // A pythonic "yield from", i.e., it calls fib with `i` parameter and waits for returned value as `current`
            current = yieldFrom(fib, i);

            if (fib.getError() != null) {
                throw new RuntimeException(fib.getError());
            }

            if (current == null) {
                continue;
            }

            if (i > 0) {
                result += ",";
            }
            result += current;

        }

        setResultForOuter(result);

        return result;
    }
}

Sync (non-thread) version:

class Fib extends CorRunSync<Integer, Integer> {

    @Override
    public Integer call() {
        Integer times = getReceiveValue();

        int a = 1, b = 1;
        for (int i = 0; times != null && i < times; i++) {
            int temp = a + b;
            a = b;
            b = temp;
        }
        yield(a);

        return getResultForOuter();
    }
}

class MainRun extends CorRunSync<String, String> {

    @Override
    public String call() {

        CorRun<Integer, Integer> fib = null;
        try {
            fib = new Fib();
        } catch (Exception e) {
            e.printStackTrace();
        }

        String result = "";
        Integer current;
        int times = 10;
        for (int i = 0; i < times; i++) {

            current = yieldFrom(fib, i);

            if (fib.getError() != null) {
                throw new RuntimeException(fib.getError());
            }

            if (current == null) {
                continue;
            }

            if (i > 0) {
                result += ",";
            }
            result += current;
        }

        stop();
        setResultForOuter(result);

        if (Utils.isEmpty(result)) {
            throw new RuntimeException("Error");
        }

        return result;
    }
}

Wykonanie (obie wersje będą działać):

// Run the entry coroutine
MainRun mainRun = new MainRun();
mainRun.start();

// Wait for mainRun ending for 5 seconds
long startTimestamp = System.currentTimeMillis();
while(!mainRun.isEnded()) {
    if (System.currentTimeMillis() - startTimestamp > TimeUnit.SECONDS.toMillis(5)) {
        throw new RuntimeException("Wait too much time");
    }
}
// The result should be "1,1,2,3,5,8,13,21,34,55"
System.out.println(mainRun.getResultForOuter());
 0
Author: John Lee,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-28 08:53:50