Jak napisać do Kafki z Spark Streaming

Używam Spark Streaming do przetwarzania danych między dwoma kolejkami Kafka, ale nie mogę znaleźć dobrego sposobu, aby pisać na Kafka z Spark. Próbowałem tego:

input.foreachRDD(rdd =>
      rdd.foreachPartition(partition =>

                partition.foreach{
                  case x:String=>{

                    val props = new HashMap[String, Object]()
                    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
                    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
                    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")

                    println(x)
                    val producer = new KafkaProducer[String,String](props)
                    val message=new ProducerRecord[String, String]("output",null,x)
                    producer.send(message)
                  }
                }


      )
    ) 

I działa zgodnie z przeznaczeniem, ale tworzenie nowego Kafkaproducenta dla każdej wiadomości jest wyraźnie niewykonalne w prawdziwym kontekście i staram się to obejść.

Kafkaproducent nie jest oczywiście serializowalny.

Chciałbym zachować odniesienie do pojedynczej instancji dla każdego procesu i uzyskać do niej dostęp, gdy będę potrzebował aby wysłać wiadomość. Jak mogę to zrobić?

Author: Chobeat, 2015-07-23

6 answers

Moją pierwszą radą byłoby spróbować stworzyć nową instancję w foreachPartition i zmierzyć, czy jest to wystarczająco szybkie dla Twoich potrzeb (tworzenie instancji ciężkich obiektów w foreachPartition jest tym, co sugeruje oficjalna dokumentacja).

Inną opcją jest użycie puli obiektów, jak pokazano w tym przykładzie:

Https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

Znalazłem jednak to trudne do wdrożenia przy użyciu checkpointing.

Kolejna wersja, która działa dobrze dla mnie jest fabryką opisaną w poniższym wpisie na blogu, wystarczy sprawdzić, czy zapewnia wystarczającą równoległość dla Twoich potrzeb (sprawdź sekcję komentarzy):

Http://allegro.tech/2015/08/spark-kafka-integracja.html

 17
Author: Marius Soutier,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-03-01 15:47:37

Tak, niestety Spark (1.x, 2.x) nie pozwala na proste pisanie do Kafki w sposób efektywny.

Proponuję następujące podejście:

  • Użyj (i ponownie użyj) jednej instancji KafkaProducer na proces wykonujący/JVM.
Oto konfiguracja wysokiego poziomu dla tego podejścia:
  1. najpierw musisz "owinąć" Kafkę KafkaProducer, ponieważ, jak wspomniałeś, nie jest to serializowalne. Owijanie go pozwala "wysłać" go do wykonawców. Kluczowa idea należy użyć lazy val, aby opóźnić tworzenie instancji producenta aż do jego pierwszego użycia, co jest efektywnym obejściem, dzięki czemu nie musisz się martwić o to, że KafkaProducer nie jest serializowalna.
  2. "wysyłasz" owiniętego producenta do każdego executora za pomocą zmiennej broadcast.
  3. w Twojej rzeczywistej logice przetwarzania, uzyskujesz dostęp do owiniętego producenta poprzez zmienną broadcast i używasz jej do zapisu wyników przetwarzania z powrotem do Kafki.

Poniższy fragment kodu działa z Spark Streaming od Spark 2.0.

Krok 1: Owijanie KafkaProducer

import java.util.concurrent.Future

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))

  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

}

object MySparkKafkaProducer {

  import scala.collection.JavaConversions._

  def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }

      producer
    }
    new MySparkKafkaProducer(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

}

Krok 2: Użyj zmiennej broadcast, aby nadać każdemu wykonawcyowi własną owiniętą instancję KafkaProducer

import org.apache.kafka.clients.producer.ProducerConfig

val ssc: StreamingContext = {
  val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
  new StreamingContext(sparkConf, Seconds(1))
}

ssc.checkpoint("checkpoint-directory")

val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", "broker1:9092")
    p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}

Krok 3: zapis z Spark Streaming do Kafka, ponowne użycie tej samej zawiniętej instancji KafkaProducer (dla każdego executora)

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata

val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
      kafkaProducer.value.send("my-output-topic", record)
    }.toStream
    metadata.foreach { metadata => metadata.get() }
  }
}
Mam nadzieję, że to pomoże.
 25
Author: Michael G. Noll,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-09-16 19:56:38

Jest Streaming Kafka Writer utrzymywany przez Cloudera (w rzeczywistości Spark JIRA [1]). Zasadniczo tworzy producenta na partycję, co amortyzuje czas spędzony na tworzeniu "ciężkich" obiektów nad (miejmy nadzieję, że dużą) kolekcją elementów.

Autora można znaleźć tutaj: https://github.com/cloudera/spark-kafka-writer

 8
Author: maasg,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-07-23 23:31:58

Miałem ten sam problem i znalazłem ten post .

Autor rozwiązuje problem, tworząc 1 producenta na wykonawcę. Zamiast wysyłać samego producenta, wysyła tylko" przepis", jak stworzyć producenta w executorze, nadając go.

    val kafkaSink = sparkContext.broadcast(KafkaSink(conf))

Używa wrappera, który leniwie tworzy producenta:

    class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

      lazy val producer = createProducer()

      def send(topic: String, value: String): Unit = producer.send(new     ProducerRecord(topic, value))
    }


    object KafkaSink {
      def apply(config: Map[String, Object]): KafkaSink = {
        val f = () => {
          val producer = new KafkaProducer[String, String](config)

          sys.addShutdownHook {
            producer.close()
          }

          producer
        }
        new KafkaSink(f)
      }
    }

Owijarka jest serializowalna, ponieważ producent Kafki jest inicjowany tuż przed pierwszym użyciem na executorze. Sterownik zachowuje odniesienie do wrappera i wrapper wysyła wiadomości używając producenta każdego executora:

    dstream.foreachRDD { rdd =>
      rdd.foreach { message =>
        kafkaSink.value.send("topicName", message)
      }
    }
 7
Author: gcaliari,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-01-31 20:52:52

Dlaczego jest to niewykonalne? Zasadniczo każda partycja każdego RDD będzie działać niezależnie (i może działać na innym węźle klastra), więc masz do ponownego połączenia (i dowolnej synchronizacji) na początku zadania każdej partycji. Jeśli narzut jest zbyt wysoki, należy zwiększyć rozmiar partii w StreamingContext, aż stanie się akceptowalny (obv. jest to koszt opóźnienia).

(Jeśli nie obsługujesz tysięcy wiadomości na każdej partycji, na pewno potrzebujesz spark-streamingu? Czy można zrobić lepiej z samodzielnej aplikacji?)

 3
Author: lmm,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-07-23 15:53:38

To może być to, co chcesz zrobić. Zasadniczo tworzysz jednego producenta dla każdej partycji rekordów.

input.foreachRDD(rdd =>
      rdd.foreachPartition(
          partitionOfRecords =>
            {
                val props = new HashMap[String, Object]()
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                val producer = new KafkaProducer[String,String](props)

                partitionOfRecords.foreach
                {
                    case x:String=>{
                        println(x)

                        val message=new ProducerRecord[String, String]("output",null,x)
                        producer.send(message)
                    }
                }
          })
) 

Hope that helps

 2
Author: sainath reddy,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-07-24 03:59:57