Skip to content Skip to sidebar Skip to footer

How To Properly Use Pyspark To Send Data To Kafka Broker?

I'm trying to write a simple pyspark job, which would receive data from a kafka broker topic, did some transformation on that data, and put the transformed data on a different kafk

Solution 1:

Here is the correct code, which reads from Kafka into Spark, and writes spark data back to a different kafka topic:

from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

defhandler(message):
    records = message.collect()
    for record in records:
        producer.send('spark.out', str(record))
        producer.flush()

defmain():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

The way to run this is:

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar s.py localhost:9092 test

Post a Comment for "How To Properly Use Pyspark To Send Data To Kafka Broker?"