Jak określiłbyś grupę goroutinów do stracenia w Golang?

TL;TR: proszę, przejdź do ostatniej części i powiedz mi, jak rozwiązałbyś ten problem.

Zacząłem używać Golang dziś rano z Pythona. Chcę wywołać plik wykonywalny o zamkniętym źródle Z Go kilka razy, z bitem współbieżności, z różnymi argumentami wiersza poleceń. Mój kod wynikowy działa dobrze, ale chciałbym uzyskać twój wkład, aby go poprawić. Ponieważ jestem na wczesnym etapie nauki, wyjaśnię również mój przepływ pracy.

Dla dobra dla uproszczenia, Załóżmy, że ten "zewnętrzny program o zamkniętym źródle" jest zenity, Linuksowym narzędziem wiersza poleceń, które może wyświetlać graficzne pola wiadomości z wiersza poleceń.

Wywołanie pliku wykonywalnego z Go

Więc, w Go, poszedłbym tak:

package main
import "os/exec"
func main() {
    cmd := exec.Command("zenity", "--info", "--text='Hello World'")
    cmd.Run()
}

To powinno działać w sam raz. Zauważ, że .Run() jest funkcjonalnym odpowiednikiem .Start(), po którym następuje .Wait(). To jest świetne, ale gdybym chciał uruchomić ten program tylko raz, całe programowanie nie byłoby warto. Zróbmy to kilka razy.

Wywołanie pliku wykonywalnego wiele razy

Teraz, gdy już to działa, chciałbym wywołać mój program wiele razy, z niestandardowymi argumentami wiersza poleceń(tutaj tylko i dla uproszczenia).

package main    
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 // Number of times the external program is called
    for i:=0; i<NumEl; i++ {
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
}
Ok, udało się! Ale nadal nie widzę zalet Go over Python ... ten kawałek kodu jest faktycznie wykonywany w sposób seryjny. Mam procesor wielordzeniowy i chciałbym z niego skorzystać. Dodajmy więc część z goroutines. [[25]} Goroutines, czyli sposób, aby mój program był równoległy

A) pierwsza próba: po prostu dodaj"idź" wszędzie

Przepisz nasz kod, aby ułatwić wywołanie i ponowne użycie oraz dodaj słynne słowo kluczowe go:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 
    for i:=0; i<NumEl; i++ {
        go callProg(i)  // <--- There!
    }
}

func callProg(i int) {
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}
Nic! W czym problem? Wszystkie goroutiny są stracone naraz. Nie wiem, dlaczego zenity nie jest wykonywane, ale AFAIK, program Go zakończył się zanim zewnętrzny program Zenity mógł być nawet zainicjowany. Zostało to potwierdzone przez użycie time.Sleep: czekanie na kilka sekund wystarczyło, aby 8 instancja zenity sama się uruchomiła. Nie wiem, czy można to uznać za błąd. Co gorsza, prawdziwy program, który chciałbym wywołać, zajmuje trochę czasu, zanim sam się uruchomi. Jeśli wykonam 8 instancji tego programu równolegle na moim 4-rdzeniowym procesorze, będzie to tracić trochę czasu na przełączanie kontekstu ... Nie wiem, jak zachowują się zwykłe go goroutines, ale exec.Command uruchomi zenity 8 razy w 8 różnych wątkach. Żeby było jeszcze gorzej, chcę wykonać ten program ponad 100,000 razy. Robienie tego wszystkiego na raz w goroutines nie będzie w ogóle skuteczne. Mimo to chciałbym wykorzystać mój 4-rdzeniowy procesor!

B) druga próba: użyj puli goroutines

Zasoby internetowe zalecają użycie sync.WaitGroup do tego rodzaju pracy. Problem z tym podejściem polega na tym, że w zasadzie pracujesz z partiami goroutines: jeśli tworzenie WaitGroup 4 członków, program Go będzie czekał na Wszystkie 4 zewnętrzne programy, aby zakończyć przed wywołaniem nowej partii 4 programów. To nie jest wydajne: PROCESOR jest marnowany, po raz kolejny.

Niektóre inne zasoby zalecały użycie buforowanego kanału do wykonania pracy:]}
package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    for i:=0; i<NumEl; i++ {
        go callProg(i, c)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
}

func callProg(i int, c chan bool) {
    defer func () {<- c}()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}
To wygląda brzydko. Kanały nie były przeznaczone do tego celu: wykorzystuję efekt uboczny. Uwielbiam pojęcie defer, ale nie znoszę konieczności deklarowania funkcji (nawet lambda) do wyłóż wartość z fałszywego kanału, który stworzyłem. No i oczywiście używanie fałszywego kanału jest samo w sobie brzydkie.

C) trzecia próba: umrzeć, gdy wszystkie dzieci nie żyją

Już prawie skończyliśmy. Muszę tylko wziąć pod uwagę kolejny efekt uboczny: program Go zamyka się, zanim wszystkie wyskakujące okienka zenity zostaną zamknięte. Dzieje się tak, ponieważ gdy pętla jest skończona (w ósmej iteracji), nic nie stoi na przeszkodzie, aby program się skończył. Tym razem sync.WaitGroup będzie przydatne.
package main
import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    wg := new(sync.WaitGroup)
    wg.Add(NumEl)            // Set the number of goroutines to (0 + NumEl)
    for i:=0; i<NumEl; i++ {
        go callProg(i, c, wg)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
    wg.Wait() // Wait for all the children to die
    close(c)
}

func callProg(i int, c chan bool, wg *sync.WaitGroup) {
    defer func () {
        <- c
        wg.Done() // Decrease the number of alive goroutines
    }()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}
Zrobione.

Moje pytania

  • znasz jakiś inny sposób na ograniczenie liczby straconych goroutinów naraz?

Nie mam na myśli wątków; jak Go zarządza goroutines wewnętrznie nie ma znaczenia. Mam na myśli ograniczenie liczby goroutines uruchomionych na raz: exec.Command tworzy nowy wątek za każdym razem, gdy jest wywoływany, więc powinienem kontrolować liczbę razy, kiedy jest wywoływany.

  • czy ten kod wygląda dobrze do ty?
  • Czy wiesz, jak uniknąć użycia fałszywego kanału w tym przypadku?
Nie mogę się przekonać, że takie fałszywe kanały są dobrą drogą.
Author: MaxVT, 2013-08-23

3 answers

Stworzyłbym 4 worker goroutines, które czytają zadania ze wspólnego kanału. Goroutiny, które są szybsze od innych (ponieważ są zaplanowane inaczej lub zdarzy się uzyskać proste zadania) otrzymają więcej zadań z tego kanału niż inne. Oprócz tego użyłbym sync.WaitGroup , aby czekać na zakończenie pracy wszystkich pracowników. Pozostała część to tylko tworzenie zadań. Przykładową implementację tego podejścia można zobaczyć tutaj:

package main

import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    tasks := make(chan *exec.Cmd, 64)

    // spawn four worker goroutines
    var wg sync.WaitGroup
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func() {
            for cmd := range tasks {
                cmd.Run()
            }
            wg.Done()
        }()
    }

    // generate some tasks
    for i := 0; i < 10; i++ {
        tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'")
    }
    close(tasks)

    // wait for the workers to finish
    wg.Wait()
}

Są prawdopodobnie inne możliwe podejścia, ale myślę, że jest to bardzo czyste rozwiązanie, które jest łatwe do zrozumienia.

 83
Author: tux21b,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-06-23 18:00:13

Proste podejście do dławienia (wykonaj f() N razy, ale maksymalnie maxConcurrency jednocześnie), tylko schemat:

package main

import (
        "sync"
)

const maxConcurrency = 4 // for example

var throttle = make(chan int, maxConcurrency)

func main() {
        const N = 100 // for example
        var wg sync.WaitGroup
        for i := 0; i < N; i++ {
                throttle <- 1 // whatever number
                wg.Add(1)
                go f(i, &wg, throttle)
        }
        wg.Wait()
}

func f(i int, wg *sync.WaitGroup, throttle chan int) {
        defer wg.Done()
        // whatever processing
        println(i)
        <-throttle
}

Plac zabaw

Nie nazwałbym kanału throttle "manekinem". IMHO to elegancki sposób (to oczywiście nie mój wynalazek), jak ograniczyć współbieżność.

BTW: zauważ, że ignorujesz zwrócony błąd z cmd.Run().

 31
Author: zzzz,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-08-23 14:33:57

Spróbuj tego: https://github.com/korovkin/limiter

 limiter := NewConcurrencyLimiter(10)
 limiter.Execute(func() {
        zenity(...) 
 })
 limiter.Wait()
 1
Author: korovkin,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-13 18:39:32