. NET asynchroniczny strumień odczytu/zapisu

[18]} próbowałem rozwiązać to ćwiczenie egzaminacyjne" Programowanie współbieżne " (w C#):

Wiedząc, że klasa Stream zawiera metody int Read(byte[] buffer, int offset, int size) i void Write(byte[] buffer, int offset, int size), zaimplementuj w C# metodę NetToFile, która kopiuje wszystkie dane otrzymane z instancji NetworkStream net do instancji FileStream file. Aby wykonać transfer, użyj asynchronicznych odczytów i zapisów synchronicznych, unikając blokowania jednego wątku podczas operacji odczytu. Transmisja kończy się, gdy operacja net read zwróci wartość 0. Aby uprościć, nie jest niezbędne do obsługi kontrolowanego anulowania operacji.

void NetToFile(NetworkStream net, FileStream file);

Próbowałem rozwiązać to ćwiczenie, ale zmagam się z pytaniem związanym z samym pytaniem. Ale najpierw mój kod:

public static void NetToFile(NetworkStream net, FileStream file) {
    byte[] buffer = new byte[4096]; // buffer with 4 kB dimension
    int offset = 0; // read/write offset
    int nBytesRead = 0; // number of bytes read on each cycle

    IAsyncResult ar;
    do {
        // read partial content of net (asynchronously)
        ar = net.BeginRead(buffer,offset,buffer.Length,null,null);
        // wait until read is completed
        ar.AsyncWaitHandle.WaitOne();
        // get number of bytes read on each cycle
        nBytesRead = net.EndRead(ar);

        // write partial content to file (synchronously)
        fs.Write(buffer,offset,nBytesRead);
        // update offset
        offset += nBytesRead;
    }
    while( nBytesRead > 0);
}

Moje pytanie jest takie, że w oświadczeniu pytającym jest powiedziane:

Aby wykonać transfer, użyj asynchronicznego czyta i zapisuje synchronicznie, unikając jeden wątek do zablokowania podczas odczytu operacje

Nie jestem pewien jeśli moje rozwiązanie osiągnie to, co jest pożądane w tym ćwiczeniu, ponieważ używam AsyncWaitHandle.WaitOne(), aby poczekać, aż odczyt asynchroniczny zakończy się.

Z drugiej strony, naprawdę nie zastanawiam się, co ma być "nieblokującym" rozwiązaniem w tym scenariuszu, ponieważ zapis FileStream ma być wykonany synchronicznie... a żeby to zrobić, muszę poczekać aż NetworkStream przeczytam, żeby kontynuować FileStream pisanie, prawda?

Możesz mi z tym pomóc?

[ edytuj 1] Using callback solution

OK, jeśli zrozumiałem, coMitchel Sellers i willvv odpowiedział, doradzono mi użyć metody callback, aby przekształcić to w rozwiązanie "nieblokujące". Oto Mój kod:

byte[] buffer; // buffer

public static void NetToFile(NetworkStream net, FileStream file) {
    // buffer with same dimension as file stream data
    buffer = new byte[file.Length];
    //start asynchronous read
    net.BeginRead(buffer,0,buffer.Length,OnEndRead,net);
}

//asynchronous callback
static void OnEndRead(IAsyncResult ar) {
    //NetworkStream retrieve
    NetworkStream net = (NetworkStream) ar.IAsyncState;
    //get number of bytes read
    int nBytesRead = net.EndRead(ar);

    //write content to file
    //... and now, how do I write to FileStream instance without
    //having its reference??
    //fs.Write(buffer,0,nBytesRead);
}

Jak zapewne zauważyłeś, utknąłem na metodzie callback, ponieważ nie mam odniesienia do instancji FileStream, w której chcę wywołać "Write(...) "metoda.

Dodatkowo, nie jest to Bezpieczny wątek rozwiązanie, ponieważ pole byte[] jest odsłonięte i może być współdzielone pomiędzy współbieżnymi wywołaniami NetToFile. Nie wiem, jak rozwiązać ten problem bez ujawnienia tego byte[] pola w zewnętrznym zakresie... i jestem prawie pewien, że nie może być odsłonięty w ten sposób.

Nie chcę używać metody lambda lub anonimowej, ponieważ tego nie ma w programie kursu "Programowanie współbieżne".

Author: ROMANIA_engineer, 2009-10-09

6 answers

Będziesz musiał użyć wywołania zwrotnego z NetStream read, aby to obsłużyć. I szczerze mówiąc, łatwiej byłoby zawinąć logikę kopiowania do własnej klasy, aby można było zachować instancję aktywnych strumieni.

Tak bym podszedł (nie testowany):

public class Assignment1
{
    public static void NetToFile(NetworkStream net, FileStream file) 
    {
        var copier = new AsyncStreamCopier(net, file);
        copier.Start();
    }

    public static void NetToFile_Option2(NetworkStream net, FileStream file) 
    {
        var completedEvent = new ManualResetEvent(false);

        // copy as usual but listen for completion
        var copier = new AsyncStreamCopier(net, file);
        copier.Completed += (s, e) => completedEvent.Set();
        copier.Start();

        completedEvent.WaitOne();
    }

    /// <summary>
    /// The Async Copier class reads the input Stream Async and writes Synchronously
    /// </summary>
    public class AsyncStreamCopier
    {
        public event EventHandler Completed;

        private readonly Stream input;
        private readonly Stream output;

        private byte[] buffer = new byte[4096];

        public AsyncStreamCopier(Stream input, Stream output)
        {
            this.input = input;
            this.output = output;
        }

        public void Start()
        {
            GetNextChunk();
        }

        private void GetNextChunk()
        {
            input.BeginRead(buffer, 0, buffer.Length, InputReadComplete, null);
        }

        private void InputReadComplete(IAsyncResult ar)
        {
            // input read asynchronously completed
            int bytesRead = input.EndRead(ar);

            if (bytesRead == 0)
            {
                RaiseCompleted();
                return;
            }

            // write synchronously
            output.Write(buffer, 0, bytesRead);

            // get next
            GetNextChunk();
        }

        private void RaiseCompleted()
        {
            if (Completed != null)
            {
                Completed(this, EventArgs.Empty);
            }
        }
    }
}
 12
Author: bendewey,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-10-08 22:12:05

Mimo, że pomoc ludziom w odrabianiu lekcji jest sprzeczna z prawdą, biorąc pod uwagę, że ma ona ponad rok, oto właściwy sposób, aby to osiągnąć. Wszystko, czego potrzebujesz do nakładania się operacji odczytu/zapisu-nie jest wymagane tworzenie dodatkowych wątków ani nic innego.

public static class StreamExtensions
{
    private const int DEFAULT_BUFFER_SIZE = short.MaxValue ; // +32767
    public static void CopyTo( this Stream input , Stream output )
    {
        input.CopyTo( output , DEFAULT_BUFFER_SIZE ) ;
        return ;
    }
    public static void CopyTo( this Stream input , Stream output , int bufferSize )
    {
        if ( !input.CanRead ) throw new InvalidOperationException(   "input must be open for reading"  );
        if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );

        byte[][]     buf   = { new byte[bufferSize] , new byte[bufferSize] } ;
        int[]        bufl  = { 0 , 0 }                                       ;
        int          bufno = 0 ;
        IAsyncResult read  = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
        IAsyncResult write = null ;

        while ( true )
        {

            // wait for the read operation to complete
            read.AsyncWaitHandle.WaitOne() ; 
            bufl[bufno] = input.EndRead(read) ;

            // if zero bytes read, the copy is complete
            if ( bufl[bufno] == 0 )
            {
                break ;
            }

            // wait for the in-flight write operation, if one exists, to complete
            // the only time one won't exist is after the very first read operation completes
            if ( write != null )
            {
                write.AsyncWaitHandle.WaitOne() ;
                output.EndWrite(write) ;
            }

            // start the new write operation
            write = output.BeginWrite( buf[bufno] , 0 , bufl[bufno] , null , null ) ;

            // toggle the current, in-use buffer
            // and start the read operation on the new buffer.
            //
            // Changed to use XOR to toggle between 0 and 1.
            // A little speedier than using a ternary expression.
            bufno ^= 1 ; // bufno = ( bufno == 0 ? 1 : 0 ) ;
            read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;

        }

        // wait for the final in-flight write operation, if one exists, to complete
        // the only time one won't exist is if the input stream is empty.
        if ( write != null )
        {
            write.AsyncWaitHandle.WaitOne() ;
            output.EndWrite(write) ;
        }

        output.Flush() ;

        // return to the caller ;
        return ;
    }


    public static async Task CopyToAsync( this Stream input , Stream output )
    {
        await input.CopyToAsync( output , DEFAULT_BUFFER_SIZE ) ;
        return;
    }

    public static async Task CopyToAsync( this Stream input , Stream output , int bufferSize )
    {
        if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" );
        if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );

        byte[][]     buf   = { new byte[bufferSize] , new byte[bufferSize] } ;
        int[]        bufl  = { 0 , 0 } ;
        int          bufno = 0 ;
        Task<int>    read  = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ) ;
        Task         write = null ;

        while ( true )
        {

            await read ;
            bufl[bufno] = read.Result ;

            // if zero bytes read, the copy is complete
            if ( bufl[bufno] == 0 )
            {
                break;
            }

            // wait for the in-flight write operation, if one exists, to complete
            // the only time one won't exist is after the very first read operation completes
            if ( write != null )
            {
                await write ;
            }

            // start the new write operation
            write = output.WriteAsync( buf[bufno] , 0 , bufl[bufno] ) ;

            // toggle the current, in-use buffer
            // and start the read operation on the new buffer.
            //
            // Changed to use XOR to toggle between 0 and 1.
            // A little speedier than using a ternary expression.
            bufno ^= 1; // bufno = ( bufno == 0 ? 1 : 0 ) ;
            read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length );

        }

        // wait for the final in-flight write operation, if one exists, to complete
        // the only time one won't exist is if the input stream is empty.
        if ( write != null )
        {
            await write;
        }

        output.Flush();

        // return to the caller ;
        return;
    }

}
Zdrówko.
 51
Author: Nicholas Carey,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-11-04 17:45:02

Wątpię, aby to był najszybszy kod (jest trochę narzutu z abstrakcji Zadań. NET), ale myślę, że jest to podejściecleaner do całej sprawy z kopiowaniem asynchronicznym.

Potrzebowałem CopyTransformAsync gdzie mógłbym przekazać delegata do zrobienia czegoś, ponieważ kawałki były przekazywane przez operację kopiowania. np. Oblicz fragment wiadomości podczas kopiowania. Dlatego zainteresowałem się własną opcją.

Wyniki:

  • CopyToAsync bufferSize jest wrażliwy (duży bufor jest wymagany)
  • FileOptions.Asynchroniczny - > sprawia, że jest strasznie powolny (Nie wiem dokładnie, dlaczego tak jest)
  • Rozmiar bufora obiektów FileStream może być mniejszy (nie jest to takie ważne)]}
  • test Serial jest oczywiście najszybszy i najbardziej zasobochłonny

Oto co znalazłem i kompletny kod źródłowy dla programu, którego użyłem do przetestowania tego. Na moim komputerze testy te były uruchamiane na dysku SSD i są odpowiednikiem kopii pliku. Zwykle nie chcesz używać tego do kopiowania plików, zamiast tego, gdy masz strumień sieciowy (który jest moim przypadkiem użycia), wtedy chcesz użyć czegoś takiego.

4K buffer

Serial...                                in 0.474s
CopyToAsync...                           timed out
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    timed out
CopyTransformAsync (Asynchronous)...     timed out

8K buffer

Serial...                                in 0.344s
CopyToAsync...                           timed out
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    in 1.116s
CopyTransformAsync (Asynchronous)...     timed out

40K buffer

Serial...                                in 0.195s
CopyToAsync...                           in 0.624s
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    in 0.378s
CopyTransformAsync (Asynchronous)...     timed out

80K buffer

Serial...                                in 0.190s
CopyToAsync...                           in 0.355s
CopyToAsync (Asynchronous)...            in 1.196s
CopyTransformAsync...                    in 0.300s
CopyTransformAsync (Asynchronous)...     in 0.886s

160K buffer

Serial...                                in 0.432s
CopyToAsync...                           in 0.252s
CopyToAsync (Asynchronous)...            in 0.454s
CopyTransformAsync...                    in 0.447s
CopyTransformAsync (Asynchronous)...     in 0.555s

Tutaj możesz zobaczyć Process Explorer, wykres wydajności podczas wykonywania testu. W zasadzie każdy Górny (w dolnym z trzech Wykresów) jest początkiem testu seryjnego. Widać wyraźnie, jak zwiększa się przepustowość wraz ze wzrostem rozmiaru bufora. Wygląda na to, że planuje gdzieś około 80K, czyli tego, czego używa metoda. NET Framework CopyToAsync, wewnętrznie.

Wykres Wydajności

Fajne jest to, że ostateczna implementacja nie była aż tak skomplikowana:

static Task CompletedTask = ((Task)Task.FromResult(0));
static async Task CopyTransformAsync(Stream inputStream
    , Stream outputStream
    , Func<ArraySegment<byte>, ArraySegment<byte>> transform = null
    )
{
    var temp = new byte[bufferSize];
    var temp2 = new byte[bufferSize];

    int i = 0;

    var readTask = inputStream
        .ReadAsync(temp, 0, bufferSize)
        .ConfigureAwait(false);

    var writeTask = CompletedTask.ConfigureAwait(false);

    for (; ; )
    {
        // synchronize read
        int read = await readTask;
        if (read == 0)
        {
            break;
        }

        if (i++ > 0)
        {
            // synchronize write
            await writeTask;
        }

        var chunk = new ArraySegment<byte>(temp, 0, read);

        // do transform (if any)
        if (!(transform == null))
        {
            chunk = transform(chunk);
        }

        // queue write
        writeTask = outputStream
            .WriteAsync(chunk.Array, chunk.Offset, chunk.Count)
            .ConfigureAwait(false);

        // queue read
        readTask = inputStream
            .ReadAsync(temp2, 0, bufferSize)
            .ConfigureAwait(false);

        // swap buffer
        var temp3 = temp;
        temp = temp2;
        temp2 = temp3;
    }

    await writeTask; // complete any lingering write task
}

Ta metoda przeplatania odczytu / zapisu pomimo ogromnych buforów jest o około 18% szybsza niż BCL CopyToAsync.

Z ciekawości zmieniłem wywołania asynchroniczne na typowe wywołania typu begin/end async i to nie poprawiło ani trochę sytuacji, tylko pogorszyło sytuację. Na wszystko, co lubię bash na abstrakcji zadań overhead, robią kilka fajnych rzeczy, gdy piszesz kod z asynchronicznych / oczekują słowa kluczowe i jest o wiele przyjemniej czytać ten kod!

 15
Author: John Leidegren,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-03-27 11:24:49

Wow, to wszystko jest bardzo skomplikowane! Oto moje rozwiązanie asynchroniczne, a to tylko jedna funkcja. Read() i BeginWrite () działają w tym samym czasie.

/// <summary>
/// Copies a stream.
/// </summary>
/// <param name="source">The stream containing the source data.</param>
/// <param name="target">The stream that will receive the source data.</param>
/// <remarks>
/// This function copies until no more can be read from the stream
///  and does not close the stream when done.<br/>
/// Read and write are performed simultaneously to improve throughput.<br/>
/// If no data can be read for 60 seconds, the copy will time-out.
/// </remarks>
public static void CopyStream(Stream source, Stream target)
{
    // This stream copy supports a source-read happening at the same time
    // as target-write.  A simpler implementation would be to use just
    // Write() instead of BeginWrite(), at the cost of speed.

    byte[] readbuffer = new byte[4096];
    byte[] writebuffer = new byte[4096];
    IAsyncResult asyncResult = null;

    for (; ; )
    {
        // Read data into the readbuffer.  The previous call to BeginWrite, if any,
        //  is executing in the background..
        int read = source.Read(readbuffer, 0, readbuffer.Length);

        // Ok, we have read some data and we're ready to write it, so wait here
        //  to make sure that the previous write is done before we write again.
        if (asyncResult != null)
        {
            // This should work down to ~0.01kb/sec
            asyncResult.AsyncWaitHandle.WaitOne(60000);
            target.EndWrite(asyncResult); // Last step to the 'write'.
            if (!asyncResult.IsCompleted) // Make sure the write really completed.
                throw new IOException("Stream write failed.");
        }

        if (read <= 0)
            return; // source stream says we're done - nothing else to read.

        // Swap the read and write buffers so we can write what we read, and we can
        //  use the then use the other buffer for our next read.
        byte[] tbuf = writebuffer;
        writebuffer = readbuffer;
        readbuffer = tbuf;

        // Asynchronously write the data, asyncResult.AsyncWaitHandle will
        // be set when done.
        asyncResult = target.BeginWrite(writebuffer, 0, read, null, null);
    }
}
 11
Author: Kenzi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-01-04 18:44:18

To dziwne, że nikt nie wspomniał o TPL.
Tutaj ' s very nice post by PFX team (Stephen Toub) about how to implementation concurrent async stream copy. Post zawiera nieaktualne refenrece do sampli więc tutaj jest jeden corrent:
Get Parallel Extensions Extras from code.msdn then

var task = sourceStream.CopyStreamToStreamAsync(destinationStream);
// do what you want with the task, for example wait when it finishes:
task.Wait();

Rozważ również użycie J. Richer ' s AsyncEnumerator .

 9
Author: Shrike,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-07-11 16:51:19

Masz rację, to co robisz to w zasadzie synchroniczny odczyt, ponieważ używasz metody WaitOne() i po prostu zatrzymuje wykonywanie, dopóki dane nie będą gotowe, to w zasadzie to samo, co robienie tego za pomocą read() zamiast BeginRead () i EndRead ().

To co musisz zrobić, to użyć argumentu callback w metodzie BeginRead (), z nim definiujesz metodę callback (lub wyrażenie lambda), ta metoda zostanie wywołana po odczytaniu informacji (w metodzie callback można musisz sprawdzić koniec strumienia i zapisać do strumienia wyjściowego), w ten sposób nie będziesz blokował głównego wątku (nie będziesz potrzebował WaitOne () ani EndRead().

Mam nadzieję, że to pomoże.

 0
Author: willvv,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-10-08 21:56:30