TP4 : Traitement en temps réel avec Spark Streaming, Kafka et persistance avec Cassandra
Objectifs du TP : Au terme de ce TP, vous seriez capable de :
- Manipuler les structures de Spark à savoir RDD et Dataframes
- Tester Spark en mode cluster sans YARN
- Implémenter la pipeline HDFS, Spark, Hive et Sqoop pour effectuer un batch processing
Spark Streaming : le traitement en temps réel
Spark Streaming est une extension de l’API principale de Spark. Elle permet de créer des applications de streaming évolutives, à haut débit et tolérantes aux pannes, tout cela sans avoir à utiliser un ensemble d’outils complètement différent et en restant dans l’environnement de Spark.
Si vous considérez Spark comme une plateforme pour travailler avec des données par lots, alors Spark Streaming étend ce concept pour travailler avec des données en unités beaucoup plus petites, mais d’une manière très similaire. Nous appelons cela le micro-batching.
Spark Streaming peut fonctionner avec une liste impressionnante de sources de données de streaming préconfigurées, Kafka étant probablement la plus importante. Nous parlerons de Kafka plus en détail dans le module suivant. Vous remarquerez également qu’AKKA est présenté différemment. C’est parce qu’il s’agit de la forme la plus brute d’un fournisseur de flux. Spark fournit les interfaces et les connexions nécessaires pour que vous puissiez créer votre propre fournisseur de streaming en utilisant AKKA comme plateforme de messagerie. Spark utilise AKKA pour son implémentation, il est donc naturel de fournir une telle fonctionnalité.
Quoi qu’il en soit, Spark propose une longue liste de sources préconfigurées en plus de celles fournies par la communauté, que vous pouvez adopter, et offre des moyens d’agir sur les données reçues. Par exemple, vous pouvez sauvegarder les résultats dans HDFS ou dans des sources de données externes telles qu’Oracle, SQL Server, Elasticsearch, Cassandra, et bien d’autres.
Au cœur de Spark Streaming se trouve une classe appelée DStream. Un DStream n’est qu’une collection de RDD avec des informations temporelles. Il est également accompagné de fonctions supplémentaires, par exemple, pour maintenir l’état à mesure que votre application de streaming progresse et pour permettre des calculs de fenêtrage.
Prenons un moment pour comprendre comment fonctionne Spark Streaming. Nous avons évidemment besoin d’un flux de données en entrée qui alimente une application Spark Streaming. Il y a évidemment un processus qui permet à Spark Streaming d’accéder à ce jeu de données en entrée, mais nous n’allons pas nous concentrer là-dessus pour l’instant. Retenez simplement qu’il y a un processus représenté ici par une ligne pointillée verticale dans la diapositive, et ce processus est responsable de fournir les données. Il existe également un composant appelé block manager dans Spark, qui garde une trace de ces données lorsqu’elles arrivent dans l’univers Spark.
Dans chaque application de streaming, l’application doit définir un intervalle de lot. Il s’agit essentiellement de l’intervalle de micro-batch dont nous parlions plus tôt. Plus l’intervalle est court, plus la latence entre la disponibilité des données en entrée et le début de leur traitement est faible. Cependant, cela ne définit pas nécessairement le temps nécessaire pour terminer le traitement. Idéalement, le traitement se termine dans chaque intervalle de lot. L’intervalle agit simplement comme un déclencheur pour lancer les mêmes étapes de traitement sur le prochain lot de données. Cependant, si le traitement prend plus de temps que l’intervalle de lot, alors le traitement s’accumule. Si cela perdure, l’application devient instable et finit par tomber en panne.
Le point ici est qu’il y a plus d’implications à choisir un intervalle de lot que simplement contrôler la latence. Votre traitement doit être terminé dans ce délai. Le meilleur moyen de vérifier cela est de tester et de résoudre les goulots d’étranglement dans votre code en consultant l’interface utilisateur de Spark Streaming.
Lorsque vos données arrivent, elles sont placées en blocs ou en partitions. Un paramètre de configuration de Spark, appelé spark.streaming.blockInterval, contrôle l’intervalle auquel les données reçues par les récepteurs de Spark Streaming sont fragmentées en blocs ou partitions. Ces partitions sont liées au temps, et non à la taille. La collection de partitions dans un intervalle de temps similaire basé sur l’intervalle de lot crée collectivement un RDD. Une fois qu’un RDD est créé à partir d’un lot, il est mis en file d’attente et programmé pour exécution par le driver, qui, tout comme dans une application Spark normale, planifie l’exécution de votre code, des transformations et des actions contre les exécuteurs Spark.
Kafka: Distributed Stream Process
Manipulation 1: Manipulation kafka data production et Spark
Vous allez mettre votre fichier docker-compose.yml à jour en ajoutant les conteneurs kafka-iot et zookeeper-iot
zookeeper:
image: confluentinc/cp-zookeeper:5.1.0
hostname: zookeeper
container_name: zookeeper-iot
ports:
- 2181:2181
networks:
- net
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:5.1.0
ports:
- 9092:9092
- 29092:29092
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.request.logger=WARN"
KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
volumes:
- /var/run/docker.sock:/var/run/docker.sock
hostname: kafka
container_name: kafka-iot
networks:
- net
restart: always
Placez-vous là vous avez votre docker-compose.xml et lancez :
docker compose up -d
Pour supprimer le réseau, lancez la commande
docker compose down
Nous allons maintenant tester une logique de Producteur d’information qui est dans notre cas « Kafka » et un Consommateur qui est dans notre cas « KafkaWordCount.jar » lancé sur spark. Si jamais, vous n’avez pas installer maven sur le terminal, lancez :
apt-get install maven
Pour ce faire nous allons créer notre projet java mavenisé :
mvn archetype:generate -DgroupId=tn.enit.tp4 -DartifactId=kafka_wordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
Naviguez vers l’emplacement du fichier pom.xml, et mettre ce contenu :
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>tn.enit.tp4</groupId>
<artifactId>kafka_wordcount</artifactId>
<version>1</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!--
Bind the maven-assembly-plugin to the package phase
this will create a jar file without the storm dependencies
suitable for deployment to a cluster.
-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>tn.enit.tp4.KafkaWordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
Maintenant, placez-vous sous /src/main/java/tn/enit/tp4, effacez le fichier existant et remplacez-le par KafkaWordCount.java
package tn.enit.tp4;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
public class KafkaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
private KafkaWordCount() {
}
public static void main(String[] args) throws Exception {
if (args.length < 4) {
System.err.println("Usage: SparkKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount");
// Creer le contexte avec une taille de batch de 2 secondes
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(2000));
int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
JavaDStream<String> lines = messages.map(Tuple2::_2);
JavaDStream<String> words =
lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts =
words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
}
Lancez maintenant la commande suivante tout en veillant à revenir là où il y a le fichier pom.xml.
mvn clean package
Cette commande vous générera le fichier executable jar que nous le copions sous le conteneur spark-master :
docker cp target/kafka_wordcount-1-jar-with-dependencies.jar spark-master:/
Maintenant, connectez-vous à spark-master
docker exec -it spark-master bash
Et lancez:
/spark/bin/spark-submit --class tn.enit.tp4.KafkaWordCount --master local[2] kafka_wordcount-1-jar-with-dependencies.jar zookeeper:2181 test Hello-Kafka 1 >> out
Laisser ce console et allez vous connecter au conteneur kafka-iot avec
docker exec -it kafka-iot bash
Placez-vous sous /usr/bin Lancez maintenant :
kafka-console-producer --broker-list localhost:9092 --topic Hello-Kafka
Envoyez votre message en l’écrivant sur le container de kafka. Maintenant, quittez kafka container et connectez-vous au container spark-master puis lancez
cat out
pour voir le résultat du traitement du message envoyé par kafka comme indiqué dans la figure.
Manipulation 2 : Speed Layer : Spark Streaming, kafka et cassandra
Nous allons créer 3 projets pour mettre à niveau la couche Speed Layer de notre architecture Lambda. Chaque projet sera lancé depuis un conteneur. Commençons tout d’abord par le Streaming Job assuré par Spark Streaming. Créer un projet spark-processor.
Traitement avec Spark Streaming
Copiez le fichier pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>tn.enit.tp4.spark</groupId>
<artifactId>spark-processor</artifactId>
<version>1.0.0</version>
<name>Spark processor for speed layer</name>
<properties>
<spark-version>3.1.0</spark-version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<!-- Spark Core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark-version}</version>
</dependency>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version> <!-- You can adjust the version according to your setup -->
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-posix</artifactId>
<version>3.1.14</version>
</dependency>
<!-- Spark Streaming with Kafka integration -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark-version}</version>
</dependency>
<!-- Spark Cassandra -->
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<!-- Other Dependencies -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10.9</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>tn.enit.tp4.processor.StreamingProcessor</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Créez sous le package tn.enit.tp4, les packages suivants:
- entity
- processor
- util
Dans le package entity, ajoutez le fichier AverageData.java:
import java.io.Serializable;
public class AverageData implements Serializable {
private String id;
private double temperature;
private double humidity;
public AverageData() {
}
public AverageData(String id, double temperature, double humidity) {
super();
this.id = id;
this.temperature = temperature;
this.humidity = humidity;
}
public String getId() {
return id;
}
public double getTemperature() {
return temperature;
}
public double getHumidity() {
return humidity;
}
}
Dans le package entity, ajoutez le fichier Humidity.java:
import java.io.Serializable;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
public class Humidity implements Serializable {
private String id;
private double value;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "MST")
private Date timestamp;
public Humidity() {
}
public Humidity(String id, double value, Date timestamp) {
super();
this.id = id;
this.value = value;
this.timestamp = timestamp;
}
public String getId() {
return id;
}
public double getValue() {
return value;
}
public Date getTimestamp() {
return timestamp;
}
}
Dans le package entity, ajoutez le fichier SensorData.java:
import java.io.Serializable;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
public class SensorData implements Serializable {
private String id;
private double temperature;
private double humidity;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "MST")
private Date timestamp;
public SensorData() {
}
public SensorData(String id, double temperature, double humidity, Date timestamp) {
super();
this.id = id;
this.temperature = temperature;
this.humidity = humidity;
this.timestamp = timestamp;
}
public String getId() {
return id;
}
public double getTemperature() {
return temperature;
}
public double getHumidity() {
return humidity;
}
public Date getTimestamp() {
return timestamp;
}
}
Dans le package entity, ajoutez le fichier Temperature.java:
import java.io.Serializable;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
public class Temperature implements Serializable{
private String id;
private double value;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone="MST")
private Date timestamp;
public Temperature(){
}
public Temperature(String id, double value, Date timestamp) {
super();
this.id = id;
this.value = value;
this.timestamp = timestamp;
}
public String getId() {
return id;
}
public double getValue() {
return value;
}
public Date getTimestamp() {
return timestamp;
}
}
Dans le package processor, ajoutez le fichier ProcessorUtils.java:
package tn.enit.tp4.processor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.api.java.JavaDStream;
import tn.enit.tp4.entity.AverageData;
import tn.enit.tp4.entity.Humidity;
import tn.enit.tp4.entity.SensorData;
import tn.enit.tp4.entity.Temperature;
import com.datastax.spark.connector.japi.CassandraJavaUtil;
import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.javaFunctions;
import java.sql.Date;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
public class ProcessorUtils {
// Get the Spark Configuration from properties
public static SparkConf getSparkConf(Properties prop) {
SparkConf sparkConf = new SparkConf()
.setAppName(prop.getProperty("tn.enit.tp4.spark.app.name"))
.setMaster(prop.getProperty("tn.enit.tp4.spark.master"))
.set("spark.cassandra.connection.host", prop.getProperty("tn.enit.tp4.cassandra.host"))
.set("spark.cassandra.connection.port", prop.getProperty("tn.enit.tp4.cassandra.port"))
.set("spark.cassandra.auth.username", prop.getProperty("tn.enit.tp4.cassandra.username"))
.set("spark.cassandra.auth.password", prop.getProperty("tn.enit.tp4.cassandra.password"))
.set("spark.cassandra.connection.keep_alive_ms", prop.getProperty("tn.enit.tp4.cassandra.keep_alive"));
// If running locally, configure Spark driver
if ("local".equals(prop.getProperty("tn.enit.tp4.env"))) {
sparkConf.set("spark.driver.bindAddress", "127.0.0.1");
}
return sparkConf;
}
// Save Temperature data to Cassandra
public static void saveTemperatureToCassandra(final JavaDStream<Temperature> dataStream) {
System.out.println("Saving to Cassandra...");
// Map Cassandra table columns
HashMap<String, String> columnNameMappings = new HashMap<>();
columnNameMappings.put("id", "id");
columnNameMappings.put("timestamp", "timestamp");
columnNameMappings.put("value", "value");
// Save data to Cassandra
javaFunctions(dataStream).writerBuilder("sensordatakeyspace", "temperature",
CassandraJavaUtil.mapToRow(Temperature.class, columnNameMappings)).saveToCassandra();
}
// Save Humidity data to Cassandra
public static void saveHumidityToCassandra(final JavaDStream<Humidity> dataStream) {
System.out.println("Saving to Cassandra...");
// Map Cassandra table columns
HashMap<String, String> columnNameMappings = new HashMap<>();
columnNameMappings.put("id", "id");
columnNameMappings.put("timestamp", "timestamp");
columnNameMappings.put("value", "value");
// Save data to Cassandra
javaFunctions(dataStream).writerBuilder("sensordatakeyspace", "humidity",
CassandraJavaUtil.mapToRow(Humidity.class, columnNameMappings)).saveToCassandra();
}
// Save averaged data to Cassandra
public static void saveAvgToCassandra(JavaRDD<AverageData> rdd) {
CassandraJavaUtil.javaFunctions(rdd)
.writerBuilder("sensordatakeyspace", "averagedata", CassandraJavaUtil.mapToRow(AverageData.class))
.saveToCassandra();
}
// Save data to HDFS
public static void saveDataToHDFS(final JavaDStream<SensorData> dataStream, String saveFile, SparkSession sql) {
System.out.println("Saving to HDFS...");
dataStream.foreachRDD(rdd -> {
if (rdd.isEmpty()) {
return;
}
Dataset<Row> dataFrame = sql.createDataFrame(rdd, SensorData.class);
// Select and save required columns
Dataset<Row> dfStore = dataFrame.selectExpr("id", "temperature", "humidity", "timestamp");
dfStore.printSchema();
dfStore.write().mode(SaveMode.Append).parquet(saveFile);
});
}
// Transform a Row into a SensorData object
public static SensorData transformData(Row row) {
System.out.println(row);
return new SensorData(row.getString(0), row.getDouble(1), row.getDouble(2), new Date(2022, 5, 5));
}
// Run batch processing to calculate average temperature and humidity
public static List<AverageData> runBatch(SparkSession sparkSession, String saveFile) {
System.out.println("Running Batch Processing");
var dataFrame = sparkSession.read().parquet(saveFile);
System.out.println(dataFrame);
JavaRDD<SensorData> rdd = dataFrame.javaRDD().map(row -> ProcessorUtils.transformData(row));
JavaRDD<Double> temp = rdd.map(SensorData::getTemperature);
JavaRDD<Double> hum = rdd.map(SensorData::getHumidity);
double avg_temp = temp.reduce((value1, value2) -> value1 + value2);
double avg_hum = hum.reduce((value1, value2) -> value1 + value2);
long length = temp.count();
avg_temp /= length;
avg_hum /= length;
System.out.println("Avg temp : " + avg_temp);
System.out.println("Avg hum : " + avg_hum);
AverageData d = new AverageData("0", avg_temp, avg_hum);
List<AverageData> average_data_list = new ArrayList<>();
average_data_list.add(d);
return average_data_list;
}
}
Dans le package processor, ajoutez le fichier StreamProcessor.java:
package tn.enit.tp4.processor;
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import tn.enit.tp4.entity.AverageData;
import tn.enit.tp4.entity.Humidity;
import tn.enit.tp4.entity.SensorData;
import tn.enit.tp4.entity.Temperature;
import tn.enit.tp4.util.SensorDataDeserializer;
import tn.enit.tp4.util.PropertyFileReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
public class StreamProcessor {
public static void main(String[] args) throws Exception {
String file = "spark-processor.properties";
Properties prop = PropertyFileReader.readPropertyFile(file);
SparkConf conf = ProcessorUtils.getSparkConf(prop);
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(10));
JavaSparkContext sc = streamingContext.sparkContext();
streamingContext.checkpoint(prop.getProperty("tn.enit.tp4.spark.checkpoint.dir"));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getProperty("tn.enit.tp4.brokerlist"));
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SensorDataDeserializer.class);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, prop.getProperty("tn.enit.tp4.topic"));
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, prop.getProperty("tn.enit.tp4.resetType"));
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Collection<String> topics = Arrays.asList(prop.getProperty("tn.enit.tp4.topic"));
JavaInputDStream<ConsumerRecord<String, SensorData>> stream = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, SensorData>Subscribe(topics, kafkaParams));
JavaDStream<SensorData> sensordataStream = stream.map(v -> {
return v.value();
});
sensordataStream.print();
JavaDStream<Temperature> temperatureStream = sensordataStream.map(v -> {
return new Temperature(v.getId(), v.getTemperature(), v.getTimestamp());
});
temperatureStream.print();
JavaDStream<Humidity> humidityStream = sensordataStream.map(v -> {
return new Humidity(v.getId(), v.getHumidity(), v.getTimestamp());
});
// save data to cassandra => stream
ProcessorUtils.saveTemperatureToCassandra(temperatureStream);
ProcessorUtils.saveHumidityToCassandra(humidityStream);
streamingContext.start();
streamingContext.awaitTermination();
}
}
Dans le package util, créez le fichier PropertyFileReader.java:
package tn.enit.tp4.util;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.log4j.Logger;
/**
* Utility class to read property file
*/
public class PropertyFileReader {
private static final Logger logger = Logger.getLogger(PropertyFileReader.class);
private static Properties prop = new Properties();
public static Properties readPropertyFile(String file) throws Exception {
if (prop.isEmpty()) {
InputStream input = PropertyFileReader.class.getClassLoader().getResourceAsStream(file);
try {
prop.load(input);
} catch (IOException ex) {
logger.error(ex);
throw ex;
} finally {
if (input != null) {
input.close();
}
}
}
return prop;
}
}
Dans le package util, créez le fichier SensorDataDeserializer.java:
package tn.enit.tp4.util;
import tn.enit.tp4.entity.SensorData;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class SensorDataDeserializer implements Deserializer<SensorData> {
private static ObjectMapper objectMapper = new ObjectMapper();
public SensorData fromBytes(byte[] bytes) {
try {
return objectMapper.readValue(bytes, SensorData.class);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public SensorData deserialize(String s, byte[] bytes) {
return fromBytes((byte[]) bytes);
}
@Override
public void close() {
}
}
Dans le dossier ressources, vous ajoutez les fichiers suivants: spark-processor-local.properties
#Kafka properties
com.iot.app.kafka.zookeeper=localhost:2181
com.iot.app.kafka.brokerlist=localhost:29092
com.iot.app.kafka.topic=sensor-data
com.iot.app.kafka.resetType=earliest
#Spark properties
com.iot.app.spark.app.name=Iot Data Processor
com.iot.app.spark.master=local[*]
com.iot.app.spark.checkpoint.dir=/tmp/iot-streaming-data
com.iot.app.hdfs=/home/IdeaProjects/spark-processor/data/
com.iot.app.jar=/home/IdeaProjects/spark-processor/target/spark-processor-1.0.0.jar
#Cassandra propertis
com.iot.app.cassandra.host=127.0.0.1
com.iot.app.cassandra.port=9042
com.iot.app.cassandra.keep_alive=10000
com.iot.app.cassandra.username=cassandra
com.iot.app.cassandra.password=cassandra
# Miscellaneous
com.iot.app.env=local
et le fichier spark-processor.properties
# Kafka properties
tn.enit.tp4.kafka.zookeeper=zookeeper:2181
tn.enit.tp4.brokerlist=kafka:9092
tn.enit.tp4.topic=sensor-data
tn.enit.tp4.resetType=earliest
#Spark properties
tn.enit.tp4.spark.app.name=Iot Data Processor
tn.enit.tp4.spark.master=spark://spark-master:7077
tn.enit.tp4.spark.checkpoint.dir=hdfs://namenode:8020/lambda-arch/checkpoint
tn.enit.tp4.hdfs=hdfs://namenode:8020/lambda-arch/
tn.enit.tp4.jar=spark-processor-1.0.0.jar
#Cassandra propertis
tn.enit.tp4.cassandra.host=172.23.0.6
tn.enit.tp4.cassandra.port=9042
tn.enit.tp4.cassandra.keep_alive=10000
tn.enit.tp4.cassandra.username=cassandra
tn.enit.tp4.cassandra.password=cassandra
# Miscellaneous
tn.enit.tp4.env=cluster
L’arborescence de votre projet ressemblera finalement à celà:
Production des données avec Kafka
Attention ⚠️ Java 8 est installée sur le container du kafka! Vous avez besoin de générer un jar avec cette version. Pour ce faire, on doit mettre cette version dans le pom.xml comme l’indique le fichier suivant ⬇️.
Créez maintenant un projet kafka-producer. Dans le projet pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>tn.enit.tp4.kafka</groupId>
<artifactId>kafka-producer</artifactId>
<version>1.0.0</version>
<name>Kafka Producer</name>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.0</version>
</dependency>
<!-- Jackson dependencies for JSON handling -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.9</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.10.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
</resource>
</resources>
<plugins>
<!-- Compiler plugin for Java 11+ -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<!-- Maven Shade Plugin for creating a fat JAR -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>tn.enit.tp4.kafka.SensorDataProducer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Nous créeons le package tn.enit.tp4.kafka.
Nous allons créer PropertyFileReader.java:
package tn.enit.tp4.kafka;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class PropertyFileReader {
private static Properties prop = new Properties();
public static Properties readPropertyFile() throws Exception {
if (prop.isEmpty()) {
InputStream input = PropertyFileReader.class.getClassLoader().getResourceAsStream("kafka-producer.properties");
try {
prop.load(input);
} catch (IOException ex) {
System.out.println(ex.toString());
throw ex;
} finally {
if (input != null) {
input.close();
}
}
}
return prop;
}
}
Créez un fichier SensorData.java :
package tn.enit.tp4.kafka;
import java.io.Serializable;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
public class SensorData implements Serializable {
private String id;
private double temperature;
private double humidity;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "MST")
private Date timestamp;
public SensorData() {
}
public SensorData(String id, double temperature, double humidity, Date timestamp) {
super();
this.id = id;
this.temperature = temperature;
this.humidity = humidity;
this.timestamp = timestamp;
}
public String getId() {
return id;
}
public double getTemperature() {
return temperature;
}
public double getHumidity() {
return humidity;
}
public Date getTimestamp() {
return timestamp;
}
}
Créez le fichier SensorDataEncoder.java:
package tn.enit.tp4.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;
public class SensorDataEncoder implements Encoder<SensorData> {
private static ObjectMapper objectMapper = new ObjectMapper();
public SensorDataEncoder(VerifiableProperties verifiableProperties) {
}
public byte[] toBytes(SensorData event) {
try {
String msg = objectMapper.writeValueAsString(event);
System.out.println(msg);
return msg.getBytes();
} catch (JsonProcessingException e) {
System.out.println("Error in Serialization" +e.getMessage());
}
return null;
}
}
Créez le fichier SensorDataProducer.java:
package tn.enit.tp4.kafka;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class SensorDataProducer {
private final Producer<String, SensorData> producer;
public SensorDataProducer(final Producer<String, SensorData> producer) {
this.producer = producer;
}
public static void main(String[] args) throws Exception {
Properties properties = PropertyFileReader.readPropertyFile();
Producer<String, SensorData> producer = new Producer<>(new ProducerConfig(properties));
SensorDataProducer iotProducer = new SensorDataProducer(producer);
iotProducer.generateIoTEvent(properties.getProperty("kafka.topic"));
}
private void generateIoTEvent(String topic) throws InterruptedException {
Random rand = new Random();
double init_val_temp = 20;
double init_val_hum = 80;
System.out.println("Sending events");
while (true) {
SensorData event = generateSensorData(rand, init_val_temp, init_val_hum);
System.out.println("Sent: " + event);
producer.send(new KeyedMessage<>(topic, event));
Thread.sleep(rand.nextInt(5000 - 2000) + 2000); // random delay of 2 to 5 seconds
}
}
private SensorData generateSensorData(final Random rand, double temp, double hum) {
String id = UUID.randomUUID().toString();
Date timestamp = new Date();
double t = temp + rand.nextDouble() * 10;
double h = hum + rand.nextDouble() * 10;
SensorData data = new SensorData(id, t, h, timestamp);
return data;
}
}
Dans le dossier ressources, ajoutez le fichier kafka-producer.properties:
# Kafka properties
#### if running from the host the kafka port will be 29092, when running from docker it will be 9092
zookeeper.connect=localhost:2181
metadata.broker.list=localhost:29092
request.required.acks=1
serializer.class=tn.enit.tp4.kafka.SensorDataEncoder
kafka.topic=sensor-data
L’arborescence de votre projet doit ressembler à celà à la fin :
Configurer le docker-compose
Maintenant, mettez tous les projets dans un seul dossier de cette manière (un nouveau dossier datasera créé aussi):
Créez votre fichier docker-compose.yml:
version: "3.3"
networks:
netw:
driver: bridge
ipam:
config:
- subnet: 172.23.0.0/24
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.0
hostname: zookeeper
container_name: zookeeper-iot
ports:
- 2181:2181
networks:
- netw
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:5.1.0
ports:
- 9092:9092
- 29092:29092
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.request.logger=WARN"
KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
volumes:
- /var/run/docker.sock:/var/run/docker.sock
hostname: kafka
container_name: kafka-iot
networks:
- netw
restart: always
cassandra:
image: 'bitnami/cassandra:latest'
hostname: cassandra
networks:
netw:
ipv4_address: 172.23.0.6
ports:
- "9042:9042"
environment:
- "MAX_HEAP_SIZE=256M"
- "HEAP_NEWSIZE=128M"
container_name: cassandra-iot
volumes:
- ./data/schema.cql:/schema.cql
spark-master:
image: bde2020/spark-master:3.0.0-hadoop3.2-java11
container_name: spark-master
hostname: spark-master
healthcheck:
interval: 5s
retries: 100
ports:
- "8080:8080"
- "7077:7077"
- "4040:4040"
- "4041:4041"
environment:
- INIT_DAEMON_STEP=false
- SPARK_DRIVER_HOST=192.168.1.5
volumes:
- ./spark-processor/target:/opt/spark-data
networks:
- netw
spark-worker-1:
image: bde2020/spark-worker:3.0.0-hadoop3.2-java11
container_name: spark-worker-1
hostname: spark-worker-1
depends_on:
- spark-master
ports:
- "8081:8081"
environment:
- "SPARK_MASTER=spark://spark-master:7077"
volumes:
- ./data/spark/:/opt/spark-data
networks:
- netw
namenode:
image: bde2020/hadoop-namenode:2.0.0-hadoop3.1.3-java8
container_name: namenode
hostname: namenode
volumes:
- ./data/namenode:/hadoop/dfs/name
environment:
- CLUSTER_NAME=test
- CORE_CONF_fs_defaultFS=hdfs://namenode:8020
healthcheck:
interval: 5s
retries: 100
networks:
- netw
ports:
- 9870:9870
- 8020:8020
datanode:
image: bde2020/hadoop-datanode:2.0.0-hadoop3.1.3-java8
container_name: datanode
hostname: datanode
volumes:
- ./data/datanode:/hadoop/dfs/data
environment:
- CORE_CONF_fs_defaultFS=hdfs://namenode:8020
depends_on:
- namenode
healthcheck:
interval: 5s
retries: 100
networks:
- netw
ports:
- 50075:50075
- 50010:50010
Créez un dossier data et créez le fichier schema.cql:
//Create keyspace
CREATE KEYSPACE IF NOT EXISTS sensordatakeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
//Create table
CREATE TABLE sensordatakeyspace.temperature (id text , timeStamp timestamp, value double, PRIMARY KEY (id));
CREATE TABLE sensordatakeyspace.humidity (id text , timeStamp timestamp, value double, PRIMARY KEY (id));
CREATE TABLE sensordatakeyspace.averagedata (id text , temperature double, humidity double, PRIMARY KEY (id));
Pour exécuter votre speed layer sur containers, lancez:
docker-compose up -d
⚠️ Attenion ! Si un container est appelé à utiliser un port déjà pris par une autre application, ce port ne peut pas être exploité. Exemple, si vous tentez de monter le container spark-master alos que son port 8081 est occupé, vous devez lancer cette commande pour savoir qui utilise ce port:
sudo lsof -i :8081Une fois identifié, vous lancez
sudo kill -9 <PID>
Il faut créer tout d’abord les dossiers suivants sur HDFS. Ces dossiers sont hyperimportants pour l’enregistrement des DAG du Spark Streaming.
docker exec namenode hdfs dfs -rm -r /lambda-arch
docker exec namenode hdfs dfs -mkdir -p /lambda-arch
docker exec namenode hdfs dfs -mkdir -p /lambda-arch/checkpoint
docker exec namenode hdfs dfs -chmod -R 777 /lambda-arch
docker exec namenode hdfs dfs -chown -R 777 /lambda-arch
docker exec namenode hdfs dfs -chmod -R 777 /lambda-arch/checkpoint
docker exec namenode hdfs dfs -chown -R 777 /lambda-arch/checkpoint
Vous devez aussi lancer la création de schema sous cassandra:
docker exec cassandra-iot cqlsh --username cassandra --password cassandra -f /schema.cql
Lancez le “Kafka Data production” en lançant depuis docker en chargeant le fichier jar sur votre container. Placez-vous sous le dossier target du projet kafka-producer et lancez:
docker cp kafka-producer-1.0.0.jar kafka-iot:/
Puis:
docker exec -it kafka-iot java -jar kafka-producer-1.0.0.jar
Vous obtiendrez l’affichage suivant :
Nous faisons de même pour le container spark-masteren chargeant le fichier jar qui se trouve dans le dossier du projet spark-processer avec la commande:
docker cp spark-processor-1.0.0.jar spark-master:/
Lancez le Stream Processor:
docker exec spark-master /spark/bin/spark-submit --class tn.enit.tp4.processor.StreamProcessor /spark-processor-1.0.0.jar
Afin de visualiser les résultats, connectez-vous à cassandra:
docker exec -it cassandra-iot cqlsh -u cassandra -p cassandra
Puis lancez:
DESCRIBE KEYSPACES;
SELECT * FROM sensordatakeyspace.temperature;
SELECT * FROM sensordatakeyspace.humidity;
Vous obtiendrez des résultats similaires à ceux-là ⬇️
A vous de jouer
Reportez le traitement effectué au niveau de la manipulation 2 sur vos propores données.