TP4 Speed Processing avec Spark Streaming, Kafka and Persisting using Cassandra

Quatrième TP

By Mariem ZAOUALI

TP4 : Traitement en temps réel avec Spark Streaming, Kafka et persistance avec Cassandra

Objectifs du TP : Au terme de ce TP, vous seriez capable de :

  • Manipuler les structures de Spark à savoir RDD et Dataframes
  • Tester Spark en mode cluster sans YARN
  • Implémenter la pipeline HDFS, Spark, Hive et Sqoop pour effectuer un batch processing

Spark Streaming : le traitement en temps réel

Spark Streaming est une extension de l’API principale de Spark. Elle permet de créer des applications de streaming évolutives, à haut débit et tolérantes aux pannes, tout cela sans avoir à utiliser un ensemble d’outils complètement différent et en restant dans l’environnement de Spark.

Définition en EN

Si vous considérez Spark comme une plateforme pour travailler avec des données par lots, alors Spark Streaming étend ce concept pour travailler avec des données en unités beaucoup plus petites, mais d’une manière très similaire. Nous appelons cela le micro-batching.

Définition en EN

Spark Streaming peut fonctionner avec une liste impressionnante de sources de données de streaming préconfigurées, Kafka étant probablement la plus importante. Nous parlerons de Kafka plus en détail dans le module suivant. Vous remarquerez également qu’AKKA est présenté différemment. C’est parce qu’il s’agit de la forme la plus brute d’un fournisseur de flux. Spark fournit les interfaces et les connexions nécessaires pour que vous puissiez créer votre propre fournisseur de streaming en utilisant AKKA comme plateforme de messagerie. Spark utilise AKKA pour son implémentation, il est donc naturel de fournir une telle fonctionnalité.

Quoi qu’il en soit, Spark propose une longue liste de sources préconfigurées en plus de celles fournies par la communauté, que vous pouvez adopter, et offre des moyens d’agir sur les données reçues. Par exemple, vous pouvez sauvegarder les résultats dans HDFS ou dans des sources de données externes telles qu’Oracle, SQL Server, Elasticsearch, Cassandra, et bien d’autres.

Au cœur de Spark Streaming se trouve une classe appelée DStream. Un DStream n’est qu’une collection de RDD avec des informations temporelles. Il est également accompagné de fonctions supplémentaires, par exemple, pour maintenir l’état à mesure que votre application de streaming progresse et pour permettre des calculs de fenêtrage.

Prenons un moment pour comprendre comment fonctionne Spark Streaming. Nous avons évidemment besoin d’un flux de données en entrée qui alimente une application Spark Streaming. Il y a évidemment un processus qui permet à Spark Streaming d’accéder à ce jeu de données en entrée, mais nous n’allons pas nous concentrer là-dessus pour l’instant. Retenez simplement qu’il y a un processus représenté ici par une ligne pointillée verticale dans la diapositive, et ce processus est responsable de fournir les données. Il existe également un composant appelé block manager dans Spark, qui garde une trace de ces données lorsqu’elles arrivent dans l’univers Spark.

Dans chaque application de streaming, l’application doit définir un intervalle de lot. Il s’agit essentiellement de l’intervalle de micro-batch dont nous parlions plus tôt. Plus l’intervalle est court, plus la latence entre la disponibilité des données en entrée et le début de leur traitement est faible. Cependant, cela ne définit pas nécessairement le temps nécessaire pour terminer le traitement. Idéalement, le traitement se termine dans chaque intervalle de lot. L’intervalle agit simplement comme un déclencheur pour lancer les mêmes étapes de traitement sur le prochain lot de données. Cependant, si le traitement prend plus de temps que l’intervalle de lot, alors le traitement s’accumule. Si cela perdure, l’application devient instable et finit par tomber en panne.

Le point ici est qu’il y a plus d’implications à choisir un intervalle de lot que simplement contrôler la latence. Votre traitement doit être terminé dans ce délai. Le meilleur moyen de vérifier cela est de tester et de résoudre les goulots d’étranglement dans votre code en consultant l’interface utilisateur de Spark Streaming.

Lorsque vos données arrivent, elles sont placées en blocs ou en partitions. Un paramètre de configuration de Spark, appelé spark.streaming.blockInterval, contrôle l’intervalle auquel les données reçues par les récepteurs de Spark Streaming sont fragmentées en blocs ou partitions. Ces partitions sont liées au temps, et non à la taille. La collection de partitions dans un intervalle de temps similaire basé sur l’intervalle de lot crée collectivement un RDD. Une fois qu’un RDD est créé à partir d’un lot, il est mis en file d’attente et programmé pour exécution par le driver, qui, tout comme dans une application Spark normale, planifie l’exécution de votre code, des transformations et des actions contre les exécuteurs Spark.

Kafka: Distributed Stream Process

Manipulation 1: Manipulation kafka data production et Spark

Vous allez mettre votre fichier docker-compose.yml à jour en ajoutant les conteneurs kafka-iot et zookeeper-iot

zookeeper:
      image: confluentinc/cp-zookeeper:5.1.0
      hostname: zookeeper
      container_name: zookeeper-iot
      ports:
        - 2181:2181
      networks:
        - net
      environment:
        ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
      image: confluentinc/cp-kafka:5.1.0
      ports:
        - 9092:9092
        - 29092:29092
      depends_on:
        - zookeeper
      environment:
        KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
        KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.request.logger=WARN"
        KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
      volumes:
        - /var/run/docker.sock:/var/run/docker.sock
      hostname: kafka
      container_name: kafka-iot
      networks:
        - net
      restart: always

Placez-vous là vous avez votre docker-compose.xml et lancez :

docker compose up -d

Pour supprimer le réseau, lancez la commande

docker compose down

Nous allons maintenant tester une logique de Producteur d’information qui est dans notre cas « Kafka » et un Consommateur qui est dans notre cas « KafkaWordCount.jar » lancé sur spark. Si jamais, vous n’avez pas installer maven sur le terminal, lancez :

apt-get install maven

Pour ce faire nous allons créer notre projet java mavenisé :

mvn archetype:generate -DgroupId=tn.enit.tp4 -DartifactId=kafka_wordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

Naviguez vers l’emplacement du fichier pom.xml, et mettre ce contenu :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>tn.enit.tp4</groupId>
    <artifactId>kafka_wordcount</artifactId>
    <version>1</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.8.2.0</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!--
                         Bind the maven-assembly-plugin to the package phase
              this will create a jar file without the storm dependencies
              suitable for deployment to a cluster.
             -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>tn.enit.tp4.KafkaWordCount</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

Maintenant, placez-vous sous /src/main/java/tn/enit/tp4, effacez le fichier existant et remplacez-le par KafkaWordCount.java

package tn.enit.tp4;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

public class KafkaWordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    private KafkaWordCount() {
    }

    public static void main(String[] args) throws Exception {
        if (args.length < 4) {
            System.err.println("Usage: SparkKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
            System.exit(1);
        }

        SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount");
        // Creer le contexte avec une taille de batch de 2 secondes
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
            new Duration(2000));

        int numThreads = Integer.parseInt(args[3]);
        Map<String, Integer> topicMap = new HashMap<>();
        String[] topics = args[2].split(",");
        for (String topic: topics) {
            topicMap.put(topic, numThreads);
        }

        JavaPairReceiverInputDStream<String, String> messages =
                KafkaUtils.createStream(jssc, args[0], args[1], topicMap);

        JavaDStream<String> lines = messages.map(Tuple2::_2);

        JavaDStream<String> words =
                lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());

        JavaPairDStream<String, Integer> wordCounts =
                words.mapToPair(s -> new Tuple2<>(s, 1))
                     .reduceByKey((i1, i2) -> i1 + i2);

        wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

Lancez maintenant la commande suivante tout en veillant à revenir là où il y a le fichier pom.xml.

mvn clean package

Cette commande vous générera le fichier executable jar que nous le copions sous le conteneur spark-master :

docker cp target/kafka_wordcount-1-jar-with-dependencies.jar spark-master:/

Maintenant, connectez-vous à spark-master

docker exec -it spark-master bash

Et lancez:

/spark/bin/spark-submit --class tn.enit.tp4.KafkaWordCount --master local[2] kafka_wordcount-1-jar-with-dependencies.jar zookeeper:2181 test Hello-Kafka 1 >> out

Laisser ce console et allez vous connecter au conteneur kafka-iot avec

docker exec -it kafka-iot bash

Placez-vous sous /usr/bin Lancez maintenant :

kafka-console-producer --broker-list localhost:9092 --topic Hello-Kafka

Envoyez votre message en l’écrivant sur le container de kafka. Maintenant, quittez kafka container et connectez-vous au container spark-master puis lancez

cat out

pour voir le résultat du traitement du message envoyé par kafka comme indiqué dans la figure.

image

Manipulation 2 : Speed Layer : Spark Streaming, kafka et cassandra

Nous allons créer 3 projets pour mettre à niveau la couche Speed Layer de notre architecture Lambda. Chaque projet sera lancé depuis un conteneur. Commençons tout d’abord par le Streaming Job assuré par Spark Streaming. Créer un projet spark-processor.

Traitement avec Spark Streaming

Copiez le fichier pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>tn.enit.tp4.spark</groupId>
    <artifactId>spark-processor</artifactId>
    <version>1.0.0</version>
    <name>Spark processor for speed layer</name>

    <properties>
        <spark-version>3.1.0</spark-version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>

        <!-- Spark Core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark-version}</version>
        </dependency>

        <!-- Kafka Clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.2.0</version> <!-- You can adjust the version according to your setup -->
        </dependency>
        <dependency>
            <groupId>com.github.jnr</groupId>
            <artifactId>jnr-posix</artifactId>
            <version>3.1.14</version> 
        </dependency>

        <!-- Spark Streaming with Kafka integration -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>${spark-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark-version}</version>
        </dependency>

        <!-- Spark Cassandra -->
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <!-- Other Dependencies -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.10.9</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>${basedir}/src/main/resources</directory>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>tn.enit.tp4.processor.StreamingProcessor</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

image

Créez sous le package tn.enit.tp4, les packages suivants:

  • entity
  • processor
  • util

Dans le package entity, ajoutez le fichier AverageData.java:

import java.io.Serializable;

public class AverageData implements Serializable {

	private String id;
	private double temperature;

	private double humidity;

	public AverageData() {

	}

	public AverageData(String id, double temperature, double humidity) {
		super();
		this.id = id;
		this.temperature = temperature;
		this.humidity = humidity;
	}

	public String getId() {
		return id;
	}

	public double getTemperature() {
		return temperature;
	}

	public double getHumidity() {
		return humidity;
	}
}

Dans le package entity, ajoutez le fichier Humidity.java:

import java.io.Serializable;
import java.util.Date;

import com.fasterxml.jackson.annotation.JsonFormat;

public class Humidity implements Serializable {

	private String id;
	private double value;
	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "MST")
	private Date timestamp;

	public Humidity() {

	}

	public Humidity(String id, double value, Date timestamp) {
		super();
		this.id = id;
		this.value = value;
		this.timestamp = timestamp;
	}

	public String getId() {
		return id;
	}

	public double getValue() {
		return value;
	}

	public Date getTimestamp() {
		return timestamp;
	}

}

Dans le package entity, ajoutez le fichier SensorData.java:

import java.io.Serializable;
import java.util.Date;

import com.fasterxml.jackson.annotation.JsonFormat;

public class SensorData implements Serializable {

	private String id;
	private double temperature;
	private double humidity;
	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "MST")
	private Date timestamp;

	public SensorData() {

	}

	public SensorData(String id, double temperature, double humidity, Date timestamp) {
		super();
		this.id = id;
		this.temperature = temperature;
		this.humidity = humidity;
		this.timestamp = timestamp;
	}

	public String getId() {
		return id;
	}

	public double getTemperature() {
		return temperature;
	}

	public double getHumidity() {
		return humidity;
	}

	public Date getTimestamp() {
		return timestamp;
	}

}

Dans le package entity, ajoutez le fichier Temperature.java:

import java.io.Serializable;
import java.util.Date;

import com.fasterxml.jackson.annotation.JsonFormat;

public class Temperature implements Serializable{
	
	private String id;
	private double value;
	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone="MST")
	private Date timestamp;
	
	public Temperature(){
		
	}

	public Temperature(String id, double value, Date timestamp) {
		super();
		this.id = id;
		this.value = value;
		this.timestamp = timestamp;
	}

	public String getId() {
		return id;
	}

	public double getValue() {
		return value;
	}

	public Date getTimestamp() {
		return timestamp;
	}

}

Dans le package processor, ajoutez le fichier ProcessorUtils.java:

package tn.enit.tp4.processor;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.api.java.JavaDStream;
import tn.enit.tp4.entity.AverageData;
import tn.enit.tp4.entity.Humidity;
import tn.enit.tp4.entity.SensorData;
import tn.enit.tp4.entity.Temperature;
import com.datastax.spark.connector.japi.CassandraJavaUtil;

import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.javaFunctions;

import java.sql.Date;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;

public class ProcessorUtils {

    // Get the Spark Configuration from properties
    public static SparkConf getSparkConf(Properties prop) {
        SparkConf sparkConf = new SparkConf()
                .setAppName(prop.getProperty("tn.enit.tp4.spark.app.name"))
                .setMaster(prop.getProperty("tn.enit.tp4.spark.master"))
                .set("spark.cassandra.connection.host", prop.getProperty("tn.enit.tp4.cassandra.host"))
                .set("spark.cassandra.connection.port", prop.getProperty("tn.enit.tp4.cassandra.port"))
                .set("spark.cassandra.auth.username", prop.getProperty("tn.enit.tp4.cassandra.username"))
                .set("spark.cassandra.auth.password", prop.getProperty("tn.enit.tp4.cassandra.password"))
                .set("spark.cassandra.connection.keep_alive_ms", prop.getProperty("tn.enit.tp4.cassandra.keep_alive"));


        // If running locally, configure Spark driver
        if ("local".equals(prop.getProperty("tn.enit.tp4.env"))) {
            sparkConf.set("spark.driver.bindAddress", "127.0.0.1");
        }
        return sparkConf;
    }

    // Save Temperature data to Cassandra
    public static void saveTemperatureToCassandra(final JavaDStream<Temperature> dataStream) {
        System.out.println("Saving to Cassandra...");

        // Map Cassandra table columns
        HashMap<String, String> columnNameMappings = new HashMap<>();
        columnNameMappings.put("id", "id");
        columnNameMappings.put("timestamp", "timestamp");
        columnNameMappings.put("value", "value");

        // Save data to Cassandra
        javaFunctions(dataStream).writerBuilder("sensordatakeyspace", "temperature",
                CassandraJavaUtil.mapToRow(Temperature.class, columnNameMappings)).saveToCassandra();
    }

    // Save Humidity data to Cassandra
    public static void saveHumidityToCassandra(final JavaDStream<Humidity> dataStream) {
        System.out.println("Saving to Cassandra...");

        // Map Cassandra table columns
        HashMap<String, String> columnNameMappings = new HashMap<>();
        columnNameMappings.put("id", "id");
        columnNameMappings.put("timestamp", "timestamp");
        columnNameMappings.put("value", "value");

        // Save data to Cassandra
        javaFunctions(dataStream).writerBuilder("sensordatakeyspace", "humidity",
                CassandraJavaUtil.mapToRow(Humidity.class, columnNameMappings)).saveToCassandra();
    }

    // Save averaged data to Cassandra
    public static void saveAvgToCassandra(JavaRDD<AverageData> rdd) {
        CassandraJavaUtil.javaFunctions(rdd)
                .writerBuilder("sensordatakeyspace", "averagedata", CassandraJavaUtil.mapToRow(AverageData.class))
                .saveToCassandra();
    }

    // Save data to HDFS
    public static void saveDataToHDFS(final JavaDStream<SensorData> dataStream, String saveFile, SparkSession sql) {
        System.out.println("Saving to HDFS...");

        dataStream.foreachRDD(rdd -> {
            if (rdd.isEmpty()) {
                return;
            }
            Dataset<Row> dataFrame = sql.createDataFrame(rdd, SensorData.class);

            // Select and save required columns
            Dataset<Row> dfStore = dataFrame.selectExpr("id", "temperature", "humidity", "timestamp");
            dfStore.printSchema();
            dfStore.write().mode(SaveMode.Append).parquet(saveFile);
        });
    }

    // Transform a Row into a SensorData object
    public static SensorData transformData(Row row) {
        System.out.println(row);
        return new SensorData(row.getString(0), row.getDouble(1), row.getDouble(2), new Date(2022, 5, 5));
    }

    // Run batch processing to calculate average temperature and humidity
    public static List<AverageData> runBatch(SparkSession sparkSession, String saveFile) {
        System.out.println("Running Batch Processing");

        var dataFrame = sparkSession.read().parquet(saveFile);
        System.out.println(dataFrame);
        JavaRDD<SensorData> rdd = dataFrame.javaRDD().map(row -> ProcessorUtils.transformData(row));

        JavaRDD<Double> temp = rdd.map(SensorData::getTemperature);
        JavaRDD<Double> hum = rdd.map(SensorData::getHumidity);

        double avg_temp = temp.reduce((value1, value2) -> value1 + value2);
        double avg_hum = hum.reduce((value1, value2) -> value1 + value2);

        long length = temp.count();

        avg_temp /= length;
        avg_hum /= length;

        System.out.println("Avg temp : " + avg_temp);
        System.out.println("Avg hum : " + avg_hum);

        AverageData d = new AverageData("0", avg_temp, avg_hum);
        List<AverageData> average_data_list = new ArrayList<>();
        average_data_list.add(d);

        return average_data_list;
    }
}

Dans le package processor, ajoutez le fichier StreamProcessor.java:

package tn.enit.tp4.processor;

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;

import tn.enit.tp4.entity.AverageData;
import tn.enit.tp4.entity.Humidity;
import tn.enit.tp4.entity.SensorData;
import tn.enit.tp4.entity.Temperature;
import tn.enit.tp4.util.SensorDataDeserializer;

import tn.enit.tp4.util.PropertyFileReader;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

public class StreamProcessor {

    public static void main(String[] args) throws Exception {

        String file = "spark-processor.properties";
        Properties prop = PropertyFileReader.readPropertyFile(file);

        SparkConf conf = ProcessorUtils.getSparkConf(prop);

        JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(10));
        JavaSparkContext sc = streamingContext.sparkContext();


        streamingContext.checkpoint(prop.getProperty("tn.enit.tp4.spark.checkpoint.dir"));

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getProperty("tn.enit.tp4.brokerlist"));
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SensorDataDeserializer.class);
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, prop.getProperty("tn.enit.tp4.topic"));
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, prop.getProperty("tn.enit.tp4.resetType"));
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        Collection<String> topics = Arrays.asList(prop.getProperty("tn.enit.tp4.topic"));

        JavaInputDStream<ConsumerRecord<String, SensorData>> stream = KafkaUtils.createDirectStream(streamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, SensorData>Subscribe(topics, kafkaParams));

        JavaDStream<SensorData> sensordataStream = stream.map(v -> {
            return v.value();
        });

        sensordataStream.print();

        JavaDStream<Temperature> temperatureStream = sensordataStream.map(v -> {
            return new Temperature(v.getId(), v.getTemperature(), v.getTimestamp());
        });
        temperatureStream.print();

        JavaDStream<Humidity> humidityStream = sensordataStream.map(v -> {
            return new Humidity(v.getId(), v.getHumidity(), v.getTimestamp());
        });

        // save data to cassandra => stream
        ProcessorUtils.saveTemperatureToCassandra(temperatureStream);

        ProcessorUtils.saveHumidityToCassandra(humidityStream);





        streamingContext.start();
        streamingContext.awaitTermination();

    }

}

Dans le package util, créez le fichier PropertyFileReader.java:

package tn.enit.tp4.util;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import org.apache.log4j.Logger;

/**
 * Utility class to read property file
 */
public class PropertyFileReader {

    private static final Logger logger = Logger.getLogger(PropertyFileReader.class);

    private static Properties prop = new Properties();

    public static Properties readPropertyFile(String file) throws Exception {
        if (prop.isEmpty()) {
            InputStream input = PropertyFileReader.class.getClassLoader().getResourceAsStream(file);
            try {
                prop.load(input);
            } catch (IOException ex) {
                logger.error(ex);
                throw ex;
            } finally {
                if (input != null) {
                    input.close();
                }
            }
        }
        return prop;
    }
}

Dans le package util, créez le fichier SensorDataDeserializer.java:

package tn.enit.tp4.util;

import tn.enit.tp4.entity.SensorData;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;


public class SensorDataDeserializer implements Deserializer<SensorData> {

    private static ObjectMapper objectMapper = new ObjectMapper();

    public SensorData fromBytes(byte[] bytes) {
        try {
            return objectMapper.readValue(bytes, SensorData.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public SensorData deserialize(String s, byte[] bytes) {
        return fromBytes((byte[]) bytes);
    }

    @Override
    public void close() {

    }
}

Dans le dossier ressources, vous ajoutez les fichiers suivants: spark-processor-local.properties

#Kafka  properties
com.iot.app.kafka.zookeeper=localhost:2181
com.iot.app.kafka.brokerlist=localhost:29092
com.iot.app.kafka.topic=sensor-data
com.iot.app.kafka.resetType=earliest

#Spark properties
com.iot.app.spark.app.name=Iot Data Processor
com.iot.app.spark.master=local[*]
com.iot.app.spark.checkpoint.dir=/tmp/iot-streaming-data
com.iot.app.hdfs=/home/IdeaProjects/spark-processor/data/
com.iot.app.jar=/home/IdeaProjects/spark-processor/target/spark-processor-1.0.0.jar

#Cassandra propertis
com.iot.app.cassandra.host=127.0.0.1
com.iot.app.cassandra.port=9042
com.iot.app.cassandra.keep_alive=10000
com.iot.app.cassandra.username=cassandra
com.iot.app.cassandra.password=cassandra

# Miscellaneous
com.iot.app.env=local

et le fichier spark-processor.properties

# Kafka  properties
tn.enit.tp4.kafka.zookeeper=zookeeper:2181
tn.enit.tp4.brokerlist=kafka:9092
tn.enit.tp4.topic=sensor-data
tn.enit.tp4.resetType=earliest

#Spark properties
tn.enit.tp4.spark.app.name=Iot Data Processor
tn.enit.tp4.spark.master=spark://spark-master:7077
tn.enit.tp4.spark.checkpoint.dir=hdfs://namenode:8020/lambda-arch/checkpoint
tn.enit.tp4.hdfs=hdfs://namenode:8020/lambda-arch/
tn.enit.tp4.jar=spark-processor-1.0.0.jar

#Cassandra propertis
tn.enit.tp4.cassandra.host=172.23.0.6
tn.enit.tp4.cassandra.port=9042
tn.enit.tp4.cassandra.keep_alive=10000
tn.enit.tp4.cassandra.username=cassandra
tn.enit.tp4.cassandra.password=cassandra

# Miscellaneous
tn.enit.tp4.env=cluster

L’arborescence de votre projet ressemblera finalement à celà: image

Production des données avec Kafka

Attention ⚠️ Java 8 est installée sur le container du kafka! Vous avez besoin de générer un jar avec cette version. Pour ce faire, on doit mettre cette version dans le pom.xml comme l’indique le fichier suivant ⬇️.

Créez maintenant un projet kafka-producer. Dans le projet pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>tn.enit.tp4.kafka</groupId>
    <artifactId>kafka-producer</artifactId>
    <version>1.0.0</version>
    <name>Kafka Producer</name>

    <dependencies>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.9.0.0</version>
        </dependency>


        <!-- Jackson dependencies for JSON handling -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.15.2</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.15.2</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.15.2</version>
        </dependency>

        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.9</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>2.0.9</version>
        </dependency>

        <!-- Testing -->
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.10.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.10.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>${basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <!-- Compiler plugin for Java 11+ -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>

            <!-- Maven Shade Plugin for creating a fat JAR -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>tn.enit.tp4.kafka.SensorDataProducer</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Nous créeons le package tn.enit.tp4.kafka. Nous allons créer PropertyFileReader.java:

package tn.enit.tp4.kafka;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class PropertyFileReader {
    private static Properties prop = new Properties();
    public static Properties readPropertyFile() throws Exception {
        if (prop.isEmpty()) {
            InputStream input = PropertyFileReader.class.getClassLoader().getResourceAsStream("kafka-producer.properties");
            try {
                prop.load(input);
            } catch (IOException ex) {
                System.out.println(ex.toString());
                throw ex;
            } finally {
                if (input != null) {
                    input.close();
                }
            }
        }
        return prop;
    }
}

Créez un fichier SensorData.java :

package tn.enit.tp4.kafka;

import java.io.Serializable;
import java.util.Date;

import com.fasterxml.jackson.annotation.JsonFormat;

public class SensorData implements Serializable {

    private String id;
    private double temperature;
    private double humidity;
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "MST")
    private Date timestamp;

    public SensorData() {

    }

    public SensorData(String id, double temperature, double humidity, Date timestamp) {
        super();
        this.id = id;
        this.temperature = temperature;
        this.humidity = humidity;
        this.timestamp = timestamp;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getHumidity() {
        return humidity;
    }

    public Date getTimestamp() {
        return timestamp;
    }

}

Créez le fichier SensorDataEncoder.java:

package tn.enit.tp4.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;

public class SensorDataEncoder implements Encoder<SensorData> {

    private static ObjectMapper objectMapper = new ObjectMapper();
    public SensorDataEncoder(VerifiableProperties verifiableProperties) {

    }
    public byte[] toBytes(SensorData event) {
        try {
            String msg = objectMapper.writeValueAsString(event);
            System.out.println(msg);
            return msg.getBytes();
        } catch (JsonProcessingException e) {
            System.out.println("Error in Serialization" +e.getMessage());
        }
        return null;
    }
}

Créez le fichier SensorDataProducer.java:

package tn.enit.tp4.kafka;

import java.util.Date;

import java.util.Properties;
import java.util.Random;
import java.util.UUID;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


public class SensorDataProducer {

    private final Producer<String, SensorData> producer;

    public SensorDataProducer(final Producer<String, SensorData> producer) {
        this.producer = producer;
    }

    public static void main(String[] args) throws Exception {

        Properties properties = PropertyFileReader.readPropertyFile();
        Producer<String, SensorData> producer = new Producer<>(new ProducerConfig(properties));
        SensorDataProducer iotProducer = new SensorDataProducer(producer);
        iotProducer.generateIoTEvent(properties.getProperty("kafka.topic"));
    }

    private void generateIoTEvent(String topic) throws InterruptedException {
        Random rand = new Random();
        double init_val_temp = 20;
        double init_val_hum = 80;
        System.out.println("Sending events");

        while (true) {
            SensorData event = generateSensorData(rand, init_val_temp, init_val_hum);
            System.out.println("Sent: " + event);
            producer.send(new KeyedMessage<>(topic, event));
            Thread.sleep(rand.nextInt(5000 - 2000) + 2000); // random delay of 2 to 5 seconds
        }
    }

    private SensorData generateSensorData(final Random rand, double temp, double hum) {
        String id = UUID.randomUUID().toString();
        Date timestamp = new Date();
        double t = temp + rand.nextDouble() * 10;
        double h = hum + rand.nextDouble() * 10;

        SensorData data = new SensorData(id, t, h, timestamp);
        return data;
    }
}


Dans le dossier ressources, ajoutez le fichier kafka-producer.properties:

# Kafka  properties
#### if running from the host the kafka port will be 29092, when running from docker it will be 9092
zookeeper.connect=localhost:2181
metadata.broker.list=localhost:29092
request.required.acks=1
serializer.class=tn.enit.tp4.kafka.SensorDataEncoder
kafka.topic=sensor-data

L’arborescence de votre projet doit ressembler à celà à la fin : image

Configurer le docker-compose

Maintenant, mettez tous les projets dans un seul dossier de cette manière (un nouveau dossier datasera créé aussi): image

Créez votre fichier docker-compose.yml:

version: "3.3"


networks:
  netw:
    driver: bridge
    ipam:
      config:
        - subnet: 172.23.0.0/24

services:

  zookeeper:
      image: confluentinc/cp-zookeeper:5.1.0
      hostname: zookeeper
      container_name: zookeeper-iot
      ports:
        - 2181:2181
      networks:
        - netw
      environment:
        ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
      image: confluentinc/cp-kafka:5.1.0
      ports:
        - 9092:9092
        - 29092:29092
      depends_on:
        - zookeeper
      environment:
        KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
        KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.request.logger=WARN"
        KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
      volumes:
        - /var/run/docker.sock:/var/run/docker.sock
      hostname: kafka
      container_name: kafka-iot
      networks:
        - netw
      restart: always

  cassandra:
    image: 'bitnami/cassandra:latest'
    hostname: cassandra
    networks:
      netw:
        ipv4_address: 172.23.0.6
    ports:
      - "9042:9042"
    environment:
      - "MAX_HEAP_SIZE=256M"
      - "HEAP_NEWSIZE=128M"
    container_name: cassandra-iot
    volumes:
      - ./data/schema.cql:/schema.cql

  spark-master:
    image: bde2020/spark-master:3.0.0-hadoop3.2-java11
    container_name: spark-master
    hostname: spark-master
    healthcheck:
      interval: 5s
      retries: 100
    ports:
      - "8080:8080"
      - "7077:7077"
      - "4040:4040"
      - "4041:4041"
    environment:
      - INIT_DAEMON_STEP=false
      - SPARK_DRIVER_HOST=192.168.1.5
    volumes:
      - ./spark-processor/target:/opt/spark-data
    networks:
      - netw
  spark-worker-1:
    image: bde2020/spark-worker:3.0.0-hadoop3.2-java11
    container_name: spark-worker-1
    hostname: spark-worker-1
    depends_on:
      - spark-master
    ports:
      - "8081:8081"
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"
    volumes:
      - ./data/spark/:/opt/spark-data
    networks:
      - netw
  namenode:
    image: bde2020/hadoop-namenode:2.0.0-hadoop3.1.3-java8
    container_name: namenode
    hostname: namenode
    volumes:
    - ./data/namenode:/hadoop/dfs/name
    environment:
    - CLUSTER_NAME=test
    - CORE_CONF_fs_defaultFS=hdfs://namenode:8020
    healthcheck:
      interval: 5s
      retries: 100
    networks:
    - netw
    ports:
    - 9870:9870
    - 8020:8020
  datanode:
    image: bde2020/hadoop-datanode:2.0.0-hadoop3.1.3-java8
    container_name: datanode
    hostname: datanode
    volumes:
    - ./data/datanode:/hadoop/dfs/data
    environment:
    - CORE_CONF_fs_defaultFS=hdfs://namenode:8020
    depends_on:
      - namenode
    healthcheck:
      interval: 5s
      retries: 100
    networks:
     - netw
    ports:
     - 50075:50075
     - 50010:50010

Créez un dossier data et créez le fichier schema.cql:

//Create keyspace
CREATE KEYSPACE IF NOT EXISTS sensordatakeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};

//Create table
CREATE TABLE sensordatakeyspace.temperature (id text , timeStamp timestamp, value double, PRIMARY KEY (id));

CREATE TABLE sensordatakeyspace.humidity (id text , timeStamp timestamp, value double, PRIMARY KEY (id));

CREATE TABLE sensordatakeyspace.averagedata (id text , temperature double, humidity double, PRIMARY KEY (id));

Pour exécuter votre speed layer sur containers, lancez:

docker-compose up -d

⚠️ Attenion ! Si un container est appelé à utiliser un port déjà pris par une autre application, ce port ne peut pas être exploité. Exemple, si vous tentez de monter le container spark-master alos que son port 8081 est occupé, vous devez lancer cette commande pour savoir qui utilise ce port:

sudo lsof -i :8081

Une fois identifié, vous lancez sudo kill -9 <PID>

Il faut créer tout d’abord les dossiers suivants sur HDFS. Ces dossiers sont hyperimportants pour l’enregistrement des DAG du Spark Streaming.

docker exec namenode hdfs dfs -rm -r /lambda-arch
docker exec namenode hdfs dfs -mkdir -p /lambda-arch
docker exec namenode hdfs dfs -mkdir -p /lambda-arch/checkpoint
docker exec namenode hdfs dfs -chmod -R 777 /lambda-arch
docker exec namenode hdfs dfs -chown -R 777 /lambda-arch
docker exec namenode hdfs dfs -chmod -R 777 /lambda-arch/checkpoint
docker exec namenode hdfs dfs -chown -R 777 /lambda-arch/checkpoint

Vous devez aussi lancer la création de schema sous cassandra:

docker exec cassandra-iot cqlsh --username cassandra --password cassandra -f /schema.cql

Lancez le “Kafka Data production” en lançant depuis docker en chargeant le fichier jar sur votre container. Placez-vous sous le dossier target du projet kafka-producer et lancez:

docker cp kafka-producer-1.0.0.jar kafka-iot:/

Puis:

docker exec -it kafka-iot java -jar kafka-producer-1.0.0.jar

Vous obtiendrez l’affichage suivant : image

Nous faisons de même pour le container spark-masteren chargeant le fichier jar qui se trouve dans le dossier du projet spark-processer avec la commande:

 docker cp spark-processor-1.0.0.jar spark-master:/

Lancez le Stream Processor:

docker exec spark-master /spark/bin/spark-submit --class tn.enit.tp4.processor.StreamProcessor /spark-processor-1.0.0.jar

Afin de visualiser les résultats, connectez-vous à cassandra:

docker exec -it cassandra-iot cqlsh -u cassandra -p cassandra

Puis lancez:

DESCRIBE KEYSPACES;
SELECT * FROM sensordatakeyspace.temperature;
SELECT * FROM sensordatakeyspace.humidity;

Vous obtiendrez des résultats similaires à ceux-là ⬇️ image

A vous de jouer

Reportez le traitement effectué au niveau de la manipulation 2 sur vos propores données.