Stataskscheduler i STA thread message

TL;DR: impas wewnątrz zadania uruchamianego przez StaTaskScheduler. Wersja długa:

Używam StaTaskScheduler z ParallelExtensionsExtras przez zespół Parallel, aby hostować niektóre starsze obiekty STA COM dostarczone przez stronę trzecią. Opis implementacji StaTaskScheduler mówi co następuje:

Dobrą wiadomością jest to, że implementacja TPL jest w stanie uruchomić się na albo Wątków MTA lub STA i uwzględnia istotne różnice wokół podstawowe API jak WaitHandle.WaitAll (który obsługuje tylko MTA wątków, gdy metoda jest dostarczana z wieloma uchwytami oczekiwania).

Myślałem, że to oznacza, że blokujące części TPL będą używać wait API, które pompuje wiadomości, JAK CoWaitForMultipleHandles, aby uniknąć sytuacji impasu podczas wywoływania wątku STA.

W mojej sytuacji, wierzę, że dzieje się co następuje: in-proc sta com obiekt A wykonuje połączenie do out-of-proc obiekt B, a następnie oczekuje callback z B poprzez jako część połączenia wychodzącego.

W formie uproszczonej:

var result = await Task.Factory.StartNew(() =>
{
    // in-proc object A
    var a = new A(); 
    // out-of-proc object B
    var b = new B(); 
    // A calls B and B calls back A during the Method call
    return a.Method(b);     
}, CancellationToken.None, TaskCreationOptions.None, staTaskScheduler);
Problem w tym, że nigdy nie wraca. O ile wiem, dzieje się tak, ponieważ blokowanie czekania gdzieś wewnątrz BlockingCollection<Task> nie pompuje wiadomości, więc moje założenie co do cytowanej wypowiedzi jest prawdopodobnie błędne.

Edytowany ten sam kod działa, gdy jest wykonywany w wątku UI aplikacji testowej WinForms (tj. podając TaskScheduler.FromCurrentSynchronizationContext() zamiast staTaskScheduler do Task.Factory.StartNew).

Jakie jest prawo jak to rozwiązać? Czy powinienem zaimplementować własny kontekst synchronizacji, który jawnie pompowałby wiadomości za pomocą CoWaitForMultipleHandles i instalowałby je na każdym wątku STA rozpoczętym przez StaTaskScheduler?

Jeśli tak, to czy podstawowa implementacja BlockingCollection będzie wywoływać moje SynchronizationContext.Wait metoda? Czy mogę użyć SynchronizationContext.WaitHelper aby wdrożyć SynchronizationContext.Wait?


EDITED with some code showing that a managed sta thread doesn ' t pump when roing a blocking wait. Kod jest kompletną aplikacją konsolową do kopiowania / wklejania / uruchamiania:
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleTestApp
{
    class Program
    {
        // start and run an STA thread
        static void RunStaThread(bool pump)
        {
            // test a blocking wait with BlockingCollection.Take
            var tasks = new BlockingCollection<Task>();

            var thread = new Thread(() => 
            {
                // Create a simple Win32 window 
                var hwndStatic = NativeMethods.CreateWindowEx(0, "Static", String.Empty, NativeMethods.WS_POPUP,
                    0, 0, 0, 0, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero);

                // subclass it with a custom WndProc
                IntPtr prevWndProc = IntPtr.Zero;

                var newWndProc = new NativeMethods.WndProc((hwnd, msg, wParam, lParam) =>
                {
                    if (msg == NativeMethods.WM_TEST)
                        Console.WriteLine("WM_TEST processed");
                    return NativeMethods.CallWindowProc(prevWndProc, hwnd, msg, wParam, lParam);
                });

                prevWndProc = NativeMethods.SetWindowLong(hwndStatic, NativeMethods.GWL_WNDPROC, newWndProc);
                if (prevWndProc == IntPtr.Zero)
                    throw new ApplicationException();

                // post a test WM_TEST message to it
                NativeMethods.PostMessage(hwndStatic, NativeMethods.WM_TEST, IntPtr.Zero, IntPtr.Zero);

                // BlockingCollection blocks without pumping, NativeMethods.WM_TEST never arrives
                try { var task = tasks.Take(); }
                catch (Exception e) { Console.WriteLine(e.Message); }

                if (pump)
                {
                    // NativeMethods.WM_TEST will arrive, because Win32 MessageBox pumps
                    Console.WriteLine("Now start pumping...");
                    NativeMethods.MessageBox(IntPtr.Zero, "Pumping messages, press OK to stop...", String.Empty, 0);
                }
            });

            thread.SetApartmentState(ApartmentState.STA);
            thread.Start();

            Thread.Sleep(2000);

            // this causes the STA thread to end
            tasks.CompleteAdding(); 

            thread.Join();
        }

        static void Main(string[] args)
        {
            Console.WriteLine("Testing without pumping...");
            RunStaThread(false);

            Console.WriteLine("\nTest with pumping...");
            RunStaThread(true);

            Console.WriteLine("Press Enter to exit");
            Console.ReadLine();
        }
    }

    // Interop
    static class NativeMethods
    {
        [DllImport("user32")]
        public static extern IntPtr SetWindowLong(IntPtr hwnd, int nIndex, WndProc newProc);

        [DllImport("user32")]
        public static extern IntPtr CallWindowProc(IntPtr lpPrevWndFunc, IntPtr hwnd, int msg, int wParam, int lParam);

        [DllImport("user32.dll")]
        public static extern IntPtr CreateWindowEx(int dwExStyle, string lpClassName, string lpWindowName, int dwStyle, int x, int y, int nWidth, int nHeight, IntPtr hWndParent, IntPtr hMenu, IntPtr hInstance, IntPtr lpParam);

        [DllImport("user32.dll")]
        public static extern bool PostMessage(IntPtr hwnd, uint msg, IntPtr wParam, IntPtr lParam);

        [DllImport("user32.dll")]
        public static extern int MessageBox(IntPtr hwnd, string text, String caption, int options);

        public delegate IntPtr WndProc(IntPtr hwnd, int msg, int wParam, int lParam);

        public const int GWL_WNDPROC = -4;
        public const int WS_POPUP = unchecked((int)0x80000000);
        public const int WM_USER = 0x0400;

        public const int WM_TEST = WM_USER + 1;
    }
}

To daje wyjście:

Testing without pumping...
The collection argument is empty and has been marked as complete with regards to additions.

Test with pumping...
The collection argument is empty and has been marked as complete with regards to additions.
Now start pumping...
WM_TEST processed
Press Enter to exit
Author: avo, 2014-01-19

2 answers

Moje zrozumienie twojego problemu: używasz StaTaskScheduler tylko do zorganizowania klasycznego mieszkania COM STA dla twoich starych obiektów COM. Nie uruchamiasz pętli wiadomości WinForms lub WPF w wątku STA StaTaskScheduler. Oznacza to, że nie używasz niczego takiego jak Application.Run, Application.DoEvents albo Dispatcher.PushFrame wewnątrz tego wątku. Popraw mnie, jeśli to błędne założenie.

By itself, StaTaskScheduler nie instaluje żadnego kontekstu synchronizacji na utworzonych przez siebie wątkach STA. Tak więc, jesteś opierając się na CLR pompować wiadomości dla Ciebie. W 2011 roku, w ramach projektu "the CLR pumps", w ramach projektu "the CLR pumps", w ramach projektu " the CLR pumps in the STA threads, in Apartments and Pumping in the CLR by Chris Brumme:

Powtarzam, że zarządzane blokowanie wykona "pewne pompowanie", gdy / align = "left" / Czy nie byłoby wspaniale wiedzieć dokładnie, co będą pompowane? Niestety pompowanie to czarna sztuka, która jest poza śmiertelnym zrozumieniem. Na Win2000 i up, po prostu delegujemy do Serwis OLE32

Oznacza to, że CLR używa CoWaitForMultipleHandles wewnętrznie dla wątków STA. Ponadto dokumenty MSDN dla znacznika COWAIT_DISPATCH_WINDOW_MESSAGES wspominają o tym :

... w STA jest tylko mały zestaw wiadomości wysyłanych w specjalnej obudowie.

Zrobiłem kilka badań na ten , ale nie mogłem dostać się do pompowania WM_TEST z twojego przykładowego kodu z CoWaitForMultipleHandles, omówiliśmy to w komentarzach do twojego pytania. My zrozumienie jest, wspomniany mały zestaw wiadomości specjalnych jest naprawdę ograniczona do niektórych komunikatów specjalnych i nie zawiera żadnych regularnych wiadomości ogólnego przeznaczenia, takich jak twoje WM_TEST.

Więc, aby odpowiedzieć na twoje pytanie:

... Czy powinienem zaimplementować Niestandardowy kontekst synchronizacji, który bezpośrednio pompuj wiadomości za pomocą CoWaitForMultipleHandles i zainstaluj je na każdym wątku STA rozpoczętym przez StaTaskScheduler?

Tak, uważam, że stworzenie własnego kontekstu synchronizacji i nadpisanie SynchronizationContext.Wait jest rzeczywiście właściwym rozwiązaniem.

Należy jednak unikać używania CoWaitForMultipleHandlesi MsgWaitForMultipleObjectsEx zamiast . Jeśli MsgWaitForMultipleObjectsEx wskazuje, że w kolejce jest oczekująca wiadomość, należy ręcznie pompować ją za pomocą PeekMessage(PM_REMOVE) i DispatchMessage. Następnie należy nadal czekać na uchwyty, wszystkie wewnątrz tego samego wywołania SynchronizationContext.Wait.

Uwaga jest subtelny, ale ważna różnica pomiędzy MsgWaitForMultipleObjectsEx A MsgWaitForMultipleObjects. Ten ostatni nie powraca i blokuje, jeśli w kolejce jest już widoczna wiadomość (np. z PeekMessage(PM_NOREMOVE) lub GetQueueStatus), ale nie jest usuwany. To nie jest dobre do pompowania, ponieważ obiekty COM mogą używać czegoś w rodzaju PeekMessage do sprawdzania kolejki komunikatów. Może to później spowodować blokadę MsgWaitForMultipleObjects, gdy nie jest to oczekiwane.

OTOH, MsgWaitForMultipleObjectsEx z MWMO_INPUTAVAILABLE flaga nie ma takich niedociągnięć i wróciłaby w tym case.

Jakiś czas temu stworzyłem własną wersję StaTaskScheduler (dostępne tutaj jako ThreadAffinityTaskScheduler) w celu rozwiązania innego problemu: utrzymanie puli wątków z powinowactwem wątków do kolejnych await kontynuacji. Powinowactwo wątku jest istotne jeśli używasz obiektów STA COM w wielu awaits. The original StaTaskScheduler wykazuje to zachowanie tylko wtedy, gdy jego pula jest ograniczona do 1 wątku.

Więc poszedłem do przodu i zrobiłem trochę więcej eksperymentowanie ze sprawą WM_TEST. Pierwotnie zainstalowałem instancję standardu SynchronizationContext zajęcia na wątku STA. WM_TEST wiadomość nie została napompowana, co było oczekiwane.

Then I overridden SynchronizationContext.Wait aby przesłać go doSynchronizationContext.WaitHelper. Dzwonili, ale i tak nie pompowali.

W końcu zaimplementowałem w pełni funkcjonalną pętlę pompy komunikatów, oto jej rdzeń:]}
// the core loop
var msg = new NativeMethods.MSG();
while (true)
{
    // MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns,
    // even if there's a message already seen but not removed in the message queue
    nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx(
        count, waitHandles,
        (uint)remainingTimeout,
        QS_MASK,
        NativeMethods.MWMO_INPUTAVAILABLE);

    if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult)
        return managedResult;

    // there is a message, pump and dispatch it
    if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE))
    {
        NativeMethods.TranslateMessage(ref msg);
        NativeMethods.DispatchMessage(ref msg);
    }
    if (hasTimedOut())
        return WaitHandle.WaitTimeout;
}

To działa, WM_TEST jest pompowane. Poniżej znajduje się dostosowana wersja testu:

public static async Task RunAsync()
{
    using (var staThread = new Noseratio.ThreadAffinity.ThreadWithAffinityContext(staThread: true, pumpMessages: true))
    {
        Console.WriteLine("Initial thread #" + Thread.CurrentThread.ManagedThreadId);
        await staThread.Run(async () =>
        {
            Console.WriteLine("On STA thread #" + Thread.CurrentThread.ManagedThreadId);
            // create a simple Win32 window
            IntPtr hwnd = CreateTestWindow();

            // Post some WM_TEST messages
            Console.WriteLine("Post some WM_TEST messages...");
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(1), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(2), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(3), IntPtr.Zero);
            Console.WriteLine("Press Enter to continue...");
            await ReadLineAsync();

            Console.WriteLine("After await, thread #" + Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine("Pending messages in the queue: " + (NativeMethods.GetQueueStatus(0x1FF) >> 16 != 0));

            Console.WriteLine("Exiting STA thread #" + Thread.CurrentThread.ManagedThreadId);
        }, CancellationToken.None);
    }
    Console.WriteLine("Current thread #" + Thread.CurrentThread.ManagedThreadId);
}

Wyjście :

Initial thread #9
On STA thread #10
Post some WM_TEST messages...
Press Enter to continue...
WM_TEST processed: 1
WM_TEST processed: 2
WM_TEST processed: 3

After await, thread #10
Pending messages in the queue: False
Exiting STA thread #10
Current thread #12
Press any key to exit

Uwaga Ta implementacja obsługuje zarówno powinowactwo wątku (pozostaje w wątku #10 po await), jak i pompowanie wiadomości. Pełny kod źródłowy zawiera części wielokrotnego użytku (ThreadAffinityTaskScheduler i ThreadWithAffinityContext) i jest dostępny tutaj jako samodzielna aplikacja konsoli . Nie został dokładnie przetestowany, więc używaj go na własne ryzyko.

 35
Author: noseratio,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:02:22

Temat pompowania wątków STA jest duży, a bardzo niewielu programistów ma przyjemny czas na rozwiązywanie impasu. Przełomowy artykuł o tym został napisany przez Chrisa Brumme, głównego mądrego faceta, który pracował nad .NET. znajdziesz go w ten wpis na blogu . Niestety jest to raczej krótki w szczegółach, nie wykracza poza to, że CLR robi bit pompowania, ale bez żadnych szczegółów na temat dokładnych zasad.

Kod, o którym mówi, dodany w. NET 2.0, to obecne w wewnętrznej funkcji CLR o nazwie MsgWaitHelper (). Kod źródłowy. NET 2.0 jest dostępny za pośrednictwem dystrybucji SSCLI20. Bardzo kompletny, ale źródło MsgWaitHelper () nie jest włączone. Dość nietypowe. Dekompilacja jest raczej przegraną przyczyną, jest bardzo duża.

[[3]}jedyną rzeczą, którą należy zabrać z jego posta na blogu, jest niebezpieczeństwo ponownego wejścia . Pompowanie wątku STA jest niebezpieczne ze względu na jego zdolność do wysyłania wiadomości Windows i uzyskiwania dowolnego kodu do wykonania gdy twój program nie jest w prawidłowym stanie, aby umożliwić wykonanie takiego kodu. Coś, co wie większość programistów VB6, kiedy użył metody DoEvents (), aby uzyskać pętlę modalną w swoim kodzie, aby przestać zamrażać interfejs użytkownika. Napisałem post o jego najbardziej typowych zagrożeniach. MsgWaitHelper() sam wykonuje ten sam rodzaj pompowania, jednak jest bardzo selektywne odnośnie dokładnie Jaki rodzaj kodu pozwala uruchomić.

Możesz uzyskać wgląd w to, co robi w twoim programie testowym, uruchamiając program bez dołączonego debuggera, a następnie dołączony debugger niezarządzany. Zobaczysz, że blokuje się w ntwaitformultipleobjects (). Zrobiłem krok dalej i ustawiłem punkt przerwania na PeekMessageW (), aby uzyskać ten ślad stosu:

user32.dll!PeekMessageW()   Unknown
combase.dll!CCliModalLoop::MyPeekMessage(tagMSG * pMsg, HWND__ * hwnd, unsigned int min, unsigned int max, unsigned short wFlag) Line 2305  C++
combase.dll!CCliModalLoop::PeekRPCAndDDEMessage() Line 2008 C++
combase.dll!CCliModalLoop::FindMessage(unsigned long dwStatus) Line 2087    C++
combase.dll!CCliModalLoop::HandleWakeForMsg() Line 1707 C++
combase.dll!CCliModalLoop::BlockFn(void * * ahEvent, unsigned long cEvents, unsigned long * lpdwSignaled) Line 1645 C++
combase.dll!ClassicSTAThreadWaitForHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * pdwIndex) Line 46 C++
combase.dll!CoWaitForMultipleHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * lpdwindex) Line 120 C++
clr.dll!MsgWaitHelper(int,void * *,int,unsigned long,int)   Unknown
clr.dll!Thread::DoAppropriateWaitWorker(int,void * *,int,unsigned long,enum WaitMode)   Unknown
clr.dll!Thread::DoAppropriateWait(int,void * *,int,unsigned long,enum WaitMode,struct PendingSync *)    Unknown
clr.dll!CLREventBase::WaitEx(unsigned long,enum WaitMode,struct PendingSync *)  Unknown
clr.dll!CLREventBase::Wait(unsigned long,int,struct PendingSync *)  Unknown
clr.dll!Thread::Block(int,struct PendingSync *) Unknown
clr.dll!SyncBlock::Wait(int,int)    Unknown
clr.dll!ObjectNative::WaitTimeout(bool,int,class Object *)  Unknown

Uważaj, że nagrałem ten ślad stosu w systemie Windows 8.1, będzie wyglądał zupełnie inaczej w starszych wersjach systemu Windows. Pętla modalna COM została mocno majstrowana w Windows 8, jest to również bardzo duża sprawa dla programów WinRT. Don ' t know that much o tym, ale wydaje się, że ma inny model gwintowania STA o nazwie ASTA, który wykonuje bardziej restrykcyjny rodzaj pompowania, zapisany w dodanym cowaitformultipleobjects () {]}

ObjectNative:: WaitTimeout () jest tam, gdzie Semaforeslim.Wait () wewnątrz BlockingCollection.Metoda Take () rozpoczyna wykonywanie kodu CLR. Widać, że przedziera się przez poziomy wewnętrznego kodu CLR, aby dotrzeć do mitycznej funkcji MsgWaitHelper (), a następnie przełączyć się na niesławną pętlę dyspozytora modalnego COM.

The sygnałem nietoperza jest wywołanie metody Climodalloop::PeekRPCAndDDEMessage (). Innymi słowy, jest to tylko biorąc pod uwagę rodzaj wiadomości interop, które są wysyłane do określonego okna wewnętrznego, które wysyła połączenia COM, które przekraczają granicę mieszkania. Spowoduje to , a nie pompowanie wiadomości znajdujących się w kolejce wiadomości dla Twojego własnego okna.

Jest to zrozumiałe zachowanie, Windows może być tylko absolutnie pewien, że ponowne wejście nie zabije Twojego programu, gdy zobaczy, że Twój wątek UI jest idle. Jest bezczynny, gdy sam pompuje pętlę wiadomości, wywołanie PeekMessage() lub GetMessage() wskazuje ten stan. Problem w tym, że się nie pompujesz. Złamałeś umowę rdzenia wątku STA, to musi pompować pętlę wiadomości. Nadzieja, że pętla modalna com zrobi pompowanie dla Ciebie, jest więc bezczynną nadzieją.

Możesz to naprawić, chociaż nie polecam. Na CLR pozostawi to samej aplikacji, aby wykonać oczekiwanie przez odpowiednio skonstruowany SynchronizationContext.Bieżący obiekt. Możesz ją utworzyć, wyprowadzając własną klasę i nadpisując metodę Wait (). Wywołanie metody SetWaitNotificationRequired (), aby przekonać CLR, że powinna pozostawić to tobie. Niekompletna wersja demonstrująca podejście:
class MySynchronizationProvider : System.Threading.SynchronizationContext {
    public MySynchronizationProvider() {
        base.SetWaitNotificationRequired();
    }
    public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout) {
        for (; ; ) {
            int result = MsgWaitForMultipleObjects(waitHandles.Length, waitHandles, waitAll, millisecondsTimeout, 8);
            if (result == waitHandles.Length) System.Windows.Forms.Application.DoEvents();
            else return result;
        }
    }
    [DllImport("user32.dll")]
    private static extern int MsgWaitForMultipleObjects(int cnt, IntPtr[] waitHandles, bool waitAll,
        int millisecondTimeout, int mask);        
}

I zainstaluj go na początku wątku:

    System.ComponentModel.AsyncOperationManager.SynchronizationContext =
        new MySynchronizationProvider();

Zobaczysz teraz wiadomość WM_TEST wysłana. To wezwanie do podania.DoEvents (), które go wysłało. Mogłem to zatuszować używając PeekMessage + DispatchMessage, ale to zaciemniłoby niebezpieczeństwo związane z tym kodem, najlepiej nie trzymać doevents () pod tabelą. Naprawdę grasz w bardzo niebezpieczną grę re-entrancy tutaj. Nie używaj tego kodu.

W skrócie, jedyną nadzieją na poprawne użycie StaThreadScheduler jest użycie go w kodzie, który już wdrożył kontrakt STA i pompuje jak wątek STA powinno wystarczyć. To było naprawdę przeznaczone jako plaster dla starego kodu, gdzie nie trzeba luksusu, aby kontrolować stan wątku. Jak każdy kod, który rozpoczął życie w programie VB6 lub dodatku Office. Eksperymentując trochę z tym, nie sądzę, aby to się udało. Godne uwagi jest też to, że potrzeba tego powinna być całkowicie wyeliminowana wraz z dostępnością asych/wait.

 16
Author: Hans Passant,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:26:12