TP4 Putting it all together l'architecture Lambda

Quatrième TP

By Mariem ZAOUALI

TP4 : Putting it all together : l’architecture Lambda

Objectifs du TP : Au terme de ce TP, vous seriez capable de pouvoir implémenter un projet mettant en oeuvre l’architecture Lambda

  • Batch processing Layer
  • Speed processing Layer
  • Serving Layer

Manipulation 1 : Speed Layer : Spark Streaming, kafka et cassandra

Nous allons créer 3 projets pour mettre à niveau la couche Speed Layer de notre architecture Lambda. Chaque projet sera lancé depuis un conteneur. Commençons tout d’abord par le Streaming Job assuré par Spark Streaming. Pour ce faire, mettez d’abord votre docker-compose.yml à jour en ajoutant le conteneur kafka.

 spark-master:
    image: bitnami/spark:3.5.1
    container_name: spark-master
    environment:
      - SPARK_MODE=master
      - SPARK_MASTER_PORT=7077
      - SPARK_MASTER_WEBUI_PORT=8080
    ports:
      - "7077:7077"
      - "8080:8080"
    networks:
      - net

  spark-worker-1:
    image: bitnami/spark:3.5.1
    container_name: spark-worker-1
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=2G
      - SPARK_WORKER_CORES=1
    ports:
      - "8081:8081"
    depends_on:
      - spark-master
    networks:
      - net

  spark-worker-2:
    image: bitnami/spark:3.5.1
    container_name: spark-worker-2
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=2G
      - SPARK_WORKER_CORES=1
    ports:
      - "8082:8081"
    depends_on:
      - spark-master
    networks:
      - net
  kafka:
    image: apache/kafka:3.7.1
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:19093"
      KAFKA_LISTENERS: "PLAINTEXT://kafka:9092,CONTROLLER://kafka:19093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
    networks:
      - net
    volumes:
      - kafka_data:/tmp/kraft-combined-logs

networks:
  net:
  cassandra:
    image: cassandra:5.0
    container_name: cassandra
    ports:
      - "9042:9042"
      - "9160:9160"
    environment:
      CASSANDRA_CLUSTER_NAME: "BigDataCluster"
      CASSANDRA_NUM_TOKENS: 8
      CASSANDRA_START_RPC: "true"
      CASSANDRA_SEEDS: "cassandra"
      CASSANDRA_ENABLE_USER_DEFINED_FUNCTIONS: "true"
      MAX_HEAP_SIZE: 2G
      HEAP_NEWSIZE: 1G
    volumes:
      - cassandra_data:/var/lib/cassandra
    networks:
      - net
    restart: always

Créer un projet maintenant s’appelant spark-processor ayant cette arborescence:

spark-processor/

├── pom.xml                          # Fichier Maven avec toutes les dépendances (Spark 3.5.1, Kafka 3.7.0, Cassandra 5.0)
├── README.md

├── src/
│   └── main/
│       ├── java/
│       │   └── tn/
│       │       └── enit/
│       │           └── tp4/
│       │               ├── processor/
│       │               │   ├── StreamProcessor.java        # Classe principale Structured Streaming
│       │               │   └── ProcessorUtils.java        # Utils pour Spark, HDFS et Cassandra
│       │               │
│       │               ├── entity/
│       │               │   ├── SensorData.java
│       │               │   ├── Temperature.java
│       │               │   ├── Humidity.java
│       │               │   └── AverageData.java
│       │               │
│       │               └── util/
│       │                   ├── PropertyFileReader.java
│       │                   └── SensorDataDeserializer.java
│       │
│       └── resources/
│           ├── spark-processor.properties                # Propriétés locales ou cluster
│           └── log4j2.xml                                 # Configuration Log4j 2

└── target/                                                # Généré par Maven (jar final)
    └── spark-processor-1.0.0.jar

Pour ce projet, nous allons utiliser les versions suivantes :

  • Spark 3.5.1 Compatible Hadoop 3.x & Kafka 3.x
  • Kafka 3.7.1 Compatible Spark Structured Streaming
  • Cassandra 5.0

Traitement avec Spark Streaming

Copiez le fichier pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>tn.enit.tp4.spark</groupId>
    <artifactId>spark-processor</artifactId>
    <version>1.0.0</version>
    <name>Spark processor for speed layer</name>

    <properties>
        <spark.version>3.5.1</spark.version>
        <scala.binary.version>2.12</scala.binary.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>

        <!-- Spark Core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Spark SQL (obligatoire pour Structured Streaming) -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Structured Streaming + Kafka -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Kafka Clients latest -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.7.0</version>
        </dependency>

        <!-- Cassandra Connector compatible Spark 3.5.x -->
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_${scala.binary.version}</artifactId>
            <version>3.5.0</version>
        </dependency>

        <!-- Time utilities -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.12.7</version>
        </dependency>

        <!-- log4j secure -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.23.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.23.1</version>
        </dependency>

    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>${basedir}/src/main/resources</directory>
            </resource>
        </resources>

        <plugins>

            <!-- JAVA 11 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>

            <!-- UBER JAR -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals><goal>shade</goal></goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>tn.enit.tp4.processor.StreamingProcessor</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

</project>

Créez sous le package tn.enit.tp4, les packages suivants:

  • entity
  • processor
  • util

Dans le package entity, ajoutez le fichier AverageData.java:

import java.io.Serializable;

public class AverageData implements Serializable {

	private String id;
	private double temperature;

	private double humidity;

	public AverageData() {

	}

	public AverageData(String id, double temperature, double humidity) {
		super();
		this.id = id;
		this.temperature = temperature;
		this.humidity = humidity;
	}

	public String getId() {
		return id;
	}

	public double getTemperature() {
		return temperature;
	}

	public double getHumidity() {
		return humidity;
	}
}

Dans le package entity, ajoutez le fichier Humidity.java:

import java.io.Serializable;
import java.util.Date;

import com.fasterxml.jackson.annotation.JsonFormat;

public class Humidity implements Serializable {

	private String id;
	private double value;
	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "MST")
	private Date timestamp;

	public Humidity() {

	}

	public Humidity(String id, double value, Date timestamp) {
		super();
		this.id = id;
		this.value = value;
		this.timestamp = timestamp;
	}

	public String getId() {
		return id;
	}

	public double getValue() {
		return value;
	}

	public Date getTimestamp() {
		return timestamp;
	}

}

Dans le package entity, ajoutez le fichier SensorData.java:

import java.io.Serializable;
import java.util.Date;

import com.fasterxml.jackson.annotation.JsonFormat;

public class SensorData implements Serializable {

	private String id;
	private double temperature;
	private double humidity;
	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "MST")
	private Date timestamp;

	public SensorData() {

	}

	public SensorData(String id, double temperature, double humidity, Date timestamp) {
		super();
		this.id = id;
		this.temperature = temperature;
		this.humidity = humidity;
		this.timestamp = timestamp;
	}

	public String getId() {
		return id;
	}

	public double getTemperature() {
		return temperature;
	}

	public double getHumidity() {
		return humidity;
	}

	public Date getTimestamp() {
		return timestamp;
	}

}

Dans le package entity, ajoutez le fichier Temperature.java:

import java.io.Serializable;
import java.util.Date;

import com.fasterxml.jackson.annotation.JsonFormat;

public class Temperature implements Serializable{
	
	private String id;
	private double value;
	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone="MST")
	private Date timestamp;
	
	public Temperature(){
		
	}

	public Temperature(String id, double value, Date timestamp) {
		super();
		this.id = id;
		this.value = value;
		this.timestamp = timestamp;
	}

	public String getId() {
		return id;
	}

	public double getValue() {
		return value;
	}

	public Date getTimestamp() {
		return timestamp;
	}

}

Dans le package processor, ajoutez le fichier ProcessorUtils.java:

package tn.enit.tp4.processor;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import tn.enit.tp4.entity.AverageData;
import tn.enit.tp4.entity.SensorData;

import com.datastax.spark.connector.cql.CassandraConnector;

import java.time.LocalDate;
import java.util.List;
import java.util.Properties;

public class ProcessorUtils {

    
    // Spark Configuration (Structured Streaming)
    public static SparkSession createSparkSession(Properties prop) {
        SparkConf conf = new SparkConf()
                .setAppName(prop.getProperty("tn.enit.tp4.spark.app.name"))
                .setMaster(prop.getProperty("tn.enit.tp4.spark.master"))
                .set("spark.cassandra.connection.host", prop.getProperty("tn.enit.tp4.cassandra.host"))
                .set("spark.cassandra.connection.port", prop.getProperty("tn.enit.tp4.cassandra.port"))
                .set("spark.cassandra.auth.username", prop.getProperty("tn.enit.tp4.cassandra.username"))
                .set("spark.cassandra.auth.password", prop.getProperty("tn.enit.tp4.cassandra.password"));

        if ("local".equals(prop.getProperty("tn.enit.tp4.env"))) {
            conf.set("spark.driver.bindAddress", "127.0.0.1");
        }

        return SparkSession.builder()
                .config(conf)
                .getOrCreate();
    }

    
    // Read data from Kafka as Structured Stream
    public static Dataset<Row> readFromKafka(SparkSession spark, String kafkaBootstrap, String topic) {

        return spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", kafkaBootstrap)
                .option("subscribe", topic)
                .option("startingOffsets", "earliest")
                .load()
                .selectExpr(
                        "CAST(key AS STRING)",
                        "CAST(value AS STRING)",
                        "timestamp"
                );
    }

   
    // Parse JSON String to SensorData Dataset
    public static Dataset<SensorData> parseSensorData(Dataset<Row> kafkaDF) {

        // Suppose value column contains JSON like {"id":"...","temperature":..,"humidity":..}
        return kafkaDF.selectExpr("CAST(value AS STRING) AS json_str")
                .as(Encoders.STRING())
                .map(json -> {
                    // Simple parsing (you can use Gson or Jackson)
                    String[] parts = json.replaceAll("[{}\"]", "").split(",");
                    String id = parts[0].split(":")[1];
                    double temperature = Double.parseDouble(parts[1].split(":")[1]);
                    double humidity = Double.parseDouble(parts[2].split(":")[1]);
                    return new SensorData(id, temperature, humidity, java.sql.Date.valueOf(LocalDate.now()));
                }, Encoders.bean(SensorData.class));
    }

    // Write SensorData to Cassandra (Structured Streaming)
    public static void writeToCassandra(Dataset<SensorData> sensorDS, String keyspace, String table) {

        try {
            StreamingQuery query = sensorDS.writeStream()
                    .foreachBatch((batchDF, batchId) -> {
                        batchDF.write()
                                .format("org.apache.spark.sql.cassandra")
                                .option("keyspace", keyspace)
                                .option("table", table)
                                .mode("append")
                                .save();
                    })
                    .outputMode("update")
                    .start();

            query.awaitTermination();

        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }
    }

    // Write SensorData to HDFS
    public static void writeToHDFS(Dataset<SensorData> sensorDS, String savePath) {
        try {
            StreamingQuery query = sensorDS.writeStream()
                    .foreachBatch((batchDF, batchId) -> {
                        batchDF.write()
                                .mode(SaveMode.Append)
                                .parquet(savePath);
                    })
                    .outputMode("append")
                    .start();

            query.awaitTermination();

        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }
    }

    // Compute Average Temperature & Humidity (Batch)
    public static AverageData computeAverage(SparkSession spark, String hdfsPath) {
        Dataset<SensorData> df = spark.read().parquet(hdfsPath).as(Encoders.bean(SensorData.class));

        double avgTemp = df.agg(functions.avg("temperature")).first().getDouble(0);
        double avgHum = df.agg(functions.avg("humidity")).first().getDouble(0);

        return new AverageData("0", avgTemp, avgHum);
    }
  // Run batch processing to calculate average temperature and humidity
    public static List<AverageData> runBatch(SparkSession sparkSession, String saveFile) {
        System.out.println("Running Batch Processing");

        var dataFrame = sparkSession.read().parquet(saveFile);
        System.out.println(dataFrame);
        JavaRDD<SensorData> rdd = dataFrame.javaRDD().map(row -> ProcessorUtils.transformData(row));

        JavaRDD<Double> temp = rdd.map(SensorData::getTemperature);
        JavaRDD<Double> hum = rdd.map(SensorData::getHumidity);

        double avg_temp = temp.reduce((value1, value2) -> value1 + value2);
        double avg_hum = hum.reduce((value1, value2) -> value1 + value2);

        long length = temp.count();

        avg_temp /= length;
        avg_hum /= length;

        System.out.println("Avg temp : " + avg_temp);
        System.out.println("Avg hum : " + avg_hum);

        AverageData d = new AverageData("0", avg_temp, avg_hum);
        List<AverageData> average_data_list = new ArrayList<>();
        average_data_list.add(d);

        return average_data_list;
    }

}

Dans le package processor, ajoutez le fichier StreamProcessor.java:

package tn.enit.tp4.processor;

import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import tn.enit.tp4.entity.SensorData;
import tn.enit.tp4.entity.Temperature;
import tn.enit.tp4.entity.Humidity;

import tn.enit.tp4.util.PropertyFileReader;

import java.util.Properties;

public class StreamProcessor {

    public static void main(String[] args) throws StreamingQueryException {

        
        // Charger les propriétés
        String file = "spark-processor.properties";
        Properties prop = PropertyFileReader.readPropertyFile(file);

        
        // Créer SparkSession
        SparkSession spark = ProcessorUtils.createSparkSession(prop);

        String kafkaBootstrap = prop.getProperty("tn.enit.tp4.brokerlist");
        String kafkaTopic = prop.getProperty("tn.enit.tp4.topic");
        String cassandraKeyspace = "sensordatakeyspace";

        // Lire depuis Kafka
        Dataset<Row> kafkaRawDF = ProcessorUtils.readFromKafka(spark, kafkaBootstrap, kafkaTopic);

        
        // Convertir en SensorData Dataset
        Dataset<SensorData> sensorDS = ProcessorUtils.parseSensorData(kafkaRawDF);

        
        // Séparer Temperature et Humidity
        Dataset<Temperature> tempDS = sensorDS.map(
                s -> new Temperature(s.getId(), s.getTemperature(), s.getTimestamp()),
                Encoders.bean(Temperature.class)
        );

        Dataset<Humidity> humDS = sensorDS.map(
                s -> new Humidity(s.getId(), s.getHumidity(), s.getTimestamp()),
                Encoders.bean(Humidity.class)
        );

        
        // Écriture vers Cassandra
        ProcessorUtils.writeToCassandra(tempDS, cassandraKeyspace, "temperature");
        ProcessorUtils.writeToCassandra(humDS, cassandraKeyspace, "humidity");

        
        // Écriture vers HDFS
        String hdfsPath = prop.getProperty("tn.enit.tp4.hdfs.path");
        if (hdfsPath != null && !hdfsPath.isEmpty()) {
            ProcessorUtils.writeToHDFS(sensorDS, hdfsPath);
        }

        // Attendre la fin du streaming
        spark.streams().awaitAnyTermination();
    }
}

Dans le package util, créez le fichier PropertyFileReader.java:

package tn.enit.tp4.util;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
 * Utility class to read property files
 */
public class PropertyFileReader {

    private static final Logger logger = LogManager.getLogger(PropertyFileReader.class);

    private static Properties prop = new Properties();

    public static Properties readPropertyFile(String file) throws IOException {
        if (prop.isEmpty()) {
            try (InputStream input = PropertyFileReader.class.getClassLoader().getResourceAsStream(file)) {
                if (input == null) {
                    throw new IOException("Property file '" + file + "' not found in the classpath");
                }
                prop.load(input);
                logger.info("Properties file '{}' loaded successfully", file);
            } catch (IOException ex) {
                logger.error("Error loading property file '{}': {}", file, ex.getMessage());
                throw ex;
            }
        }
        return prop;
    }
}

Dans le package util, créez le fichier SensorDataDeserializer.java:

package tn.enit.tp4.util;

import tn.enit.tp4.entity.SensorData;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;

public class SensorDataDeserializer implements Deserializer<SensorData> {

    private static final Logger logger = LogManager.getLogger(SensorDataDeserializer.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Pas de configuration spécifique
    }

    @Override
    public SensorData deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        try {
            return objectMapper.readValue(data, SensorData.class);
        } catch (Exception e) {
            logger.error("Error deserializing SensorData from topic {}: {}", topic, e.getMessage(), e);
            return null;
        }
    }

    @Override
    public void close() {
        // Pas de ressources à fermer
    }
}

Dans le dossier ressources, vous ajoutez les fichiers suivants: spark-processor-local.properties

# -----------------------------
# Kafka properties
# -----------------------------
tn.enit.tp4.brokerlist=localhost:29092
tn.enit.tp4.topic=sensor-data
tn.enit.tp4.resetType=earliest

# -----------------------------
# Spark properties
# -----------------------------
tn.enit.tp4.spark.app.name=Iot Data Processor
tn.enit.tp4.spark.master=local[*]
tn.enit.tp4.spark.checkpoint.dir=/tmp/iot-streaming-data
tn.enit.tp4.hdfs.path=/home/IdeaProjects/spark-processor/data/
tn.enit.tp4.jar.path=/home/IdeaProjects/spark-processor/target/spark-processor-1.0.0.jar

# -----------------------------
# Cassandra properties
# -----------------------------
tn.enit.tp4.cassandra.host=127.0.0.1
tn.enit.tp4.cassandra.port=9042
tn.enit.tp4.cassandra.keep_alive=10000
tn.enit.tp4.cassandra.username=cassandra
tn.enit.tp4.cassandra.password=cassandra
tn.enit.tp4.cassandra.keyspace=sensordatakeyspace

# -----------------------------
# Miscellaneous
# -----------------------------
tn.enit.tp4.env=local

et le fichier spark-processor.properties

# -----------------------------
# Kafka properties
# -----------------------------
tn.enit.tp4.brokerlist=kafka:9092
tn.enit.tp4.topic=sensor-data
tn.enit.tp4.resetType=earliest
# zookeeper n'est plus obligatoire pour les clients Kafka récents
# tn.enit.tp4.kafka.zookeeper=zookeeper:2181

# -----------------------------
# Spark properties
# -----------------------------
tn.enit.tp4.spark.app.name=Iot Data Processor
tn.enit.tp4.spark.master=spark://spark-master:7077
tn.enit.tp4.spark.checkpoint.dir=hdfs://namenode:8020/lambda-arch/checkpoint
tn.enit.tp4.hdfs.path=hdfs://namenode:8020/lambda-arch/
tn.enit.tp4.jar.path=spark-processor-1.0.0.jar

# -----------------------------
# Cassandra properties
# -----------------------------
tn.enit.tp4.cassandra.host=cassandra
tn.enit.tp4.cassandra.port=9042
tn.enit.tp4.cassandra.keep_alive=10000
tn.enit.tp4.cassandra.username=cassandra
tn.enit.tp4.cassandra.password=cassandra
tn.enit.tp4.cassandra.keyspace=sensordatakeyspace

# -----------------------------
# Miscellaneous
# -----------------------------
tn.enit.tp4.env=cluster

L’arborescence de votre projet ressemblera finalement à celà: image

Production des données avec Kafka

Attention ⚠️ Java 8 est installée sur le container du kafka! Vous avez besoin de générer un jar avec cette version. Pour ce faire, on doit mettre cette version dans le pom.xml comme l’indique le fichier suivant ⬇️.

Créez maintenant un projet kafka-producer avec l’arborescence suivante:

kafka-producer/

├── pom.xml                           # Dépendances Kafka, Jackson, SLF4J, JUnit
├── README.md

├── src/
│   └── main/
│       ├── java/
│       │   └── tn/
│       │       └── enit/
│       │           └── tp4/
│       │               └── kafka/
│       │                   ├── PropertyFileReader.java    # Lecture des propriétés Kafka
│       │                   ├── SensorData.java            # Entité SensorData
│       │                   ├── SensorDataEncoder.java     # Sérialiseur Kafka
│       │                   └── SensorDataProducer.java    # Producteur Kafka principal
│       │
│       └── resources/
│           └── kafka-producer.properties                  # Configuration Kafka

└── target/                                                 # Jar généré après compilation
    └── kafka-producer-1.0.0.jar

Dans le projet, créez pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>tn.enit.tp4.kafka</groupId>
    <artifactId>kafka-producer</artifactId>
    <version>1.0.0</version>
    <name>Kafka Producer</name>

    <properties>
        <kafka.version>3.7.0</kafka.version> <!-- version récente compatible Java 8 -->
        <jackson.version>2.15.2</jackson.version>
        <slf4j.version>2.0.9</slf4j.version>
        <junit.version>5.10.0</junit.version>
    </properties>

    <dependencies>

        <!-- Kafka Clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <!-- Jackson dependencies for JSON handling -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>${jackson.version}</version>
        </dependency>

        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <!-- Testing -->
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>${basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <!-- Compiler plugin for Java 8 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>

            <!-- Maven Shade Plugin for creating a fat JAR -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>tn.enit.tp4.kafka.SensorDataProducer</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Nous créeons le package tn.enit.tp4.kafka. Nous allons créer PropertyFileReader.java:

package tn.enit.tp4.kafka;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class PropertyFileReader {
    private static Properties prop = new Properties();

    public static Properties readPropertyFile() throws Exception {
        if (prop.isEmpty()) {
            try (InputStream input = PropertyFileReader.class.getClassLoader()
                    .getResourceAsStream("kafka-producer.properties")) {
                
                if (input == null) {
                    throw new IOException("Fichier kafka-producer.properties introuvable dans le classpath");
                }
                
                prop.load(input);
            } catch (IOException ex) {
                System.err.println("Erreur lors de la lecture du fichier de propriétés : " + ex.getMessage());
                throw ex;
            }
        }
        return prop;
    }
}

Créez un fichier SensorData.java :

package tn.enit.tp4.kafka;

import java.io.Serializable;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;

public class SensorData implements Serializable {

    private String id;
    private double temperature;
    private double humidity;

    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "MST")
    private Date timestamp;

    public SensorData() { }

    public SensorData(String id, double temperature, double humidity, Date timestamp) {
        this.id = id;
        this.temperature = temperature;
        this.humidity = humidity;
        this.timestamp = timestamp;
    }

    // Getters
    public String getId() { return id; }
    public double getTemperature() { return temperature; }
    public double getHumidity() { return humidity; }
    public Date getTimestamp() { return timestamp; }

    // Setters (nécessaires pour Jackson)
    public void setId(String id) { this.id = id; }
    public void setTemperature(double temperature) { this.temperature = temperature; }
    public void setHumidity(double humidity) { this.humidity = humidity; }
    public void setTimestamp(Date timestamp) { this.timestamp = timestamp; }

    @Override
    public String toString() {
        return "SensorData{" +
                "id='" + id + '\'' +
                ", temperature=" + temperature +
                ", humidity=" + humidity +
                ", timestamp=" + timestamp +
                '}';
    }
}

Créez le fichier SensorDataEncoder.java:

package tn.enit.tp4.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class SensorDataSerializer implements Serializer<SensorData> {

    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Pas nécessaire pour l'instant
    }

    @Override
    public byte[] serialize(String topic, SensorData data) {
        try {
            String msg = objectMapper.writeValueAsString(data);
            System.out.println("Serialized: " + msg);
            return msg.getBytes();
        } catch (JsonProcessingException e) {
            System.err.println("Error in serialization: " + e.getMessage());
            return null;
        }
    }

    @Override
    public void close() {
        // Pas nécessaire
    }
}

Créez le fichier SensorDataProducer.java:

package tn.enit.tp4.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.Date;

public class SensorDataProducer {

    private final KafkaProducer<String, SensorData> producer;

    public SensorDataProducer(KafkaProducer<String, SensorData> producer) {
        this.producer = producer;
    }

    public static void main(String[] args) throws Exception {
        // Lire les propriétés
        Properties properties = PropertyFileReader.readPropertyFile();

        // Configurer Kafka moderne
        Properties kafkaProps = new Properties();
        kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("metadata.broker.list"));
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "tn.enit.tp4.kafka.SensorDataSerializer");
        kafkaProps.put(ProducerConfig.ACKS_CONFIG, properties.getProperty("request.required.acks", "1"));

        KafkaProducer<String, SensorData> producer = new KafkaProducer<>(kafkaProps);
        SensorDataProducer iotProducer = new SensorDataProducer(producer);

        iotProducer.generateIoTEvent(properties.getProperty("kafka.topic"));
    }

    private void generateIoTEvent(String topic) throws InterruptedException {
        Random rand = new Random();
        double init_val_temp = 20;
        double init_val_hum = 80;

        System.out.println("Sending events...");

        while (true) {
            SensorData event = generateSensorData(rand, init_val_temp, init_val_hum);
            System.out.println("Sent: " + event.getId() + " Temp:" + event.getTemperature() + " Hum:" + event.getHumidity());
            
            // Envoi avec Kafka moderne
            producer.send(new ProducerRecord<>(topic, event.getId(), event));

            Thread.sleep(rand.nextInt(3000) + 2000); // délai aléatoire de 2 à 5 secondes
        }
    }

    private SensorData generateSensorData(final Random rand, double temp, double hum) {
        String id = UUID.randomUUID().toString();
        Date timestamp = new Date();
        double t = temp + rand.nextDouble() * 10;
        double h = hum + rand.nextDouble() * 10;

        return new SensorData(id, t, h, timestamp);
    }
}


Dans le dossier ressources, ajoutez le fichier kafka-producer.properties:

# Kafka properties (modern Kafka Producer)
# Si Docker: kafka:9092, si host: localhost:29092
bootstrap.servers=localhost:29092
acks=1
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=tn.enit.tp4.kafka.SensorDataSerializer
kafka.topic=sensor-data

Créez un dossier data et créez le fichier schema.cql:

//Create keyspace
CREATE KEYSPACE IF NOT EXISTS sensordatakeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};

//Create table
CREATE TABLE sensordatakeyspace.temperature (id text , timeStamp timestamp, value double, PRIMARY KEY (id));

CREATE TABLE sensordatakeyspace.humidity (id text , timeStamp timestamp, value double, PRIMARY KEY (id));

CREATE TABLE sensordatakeyspace.averagedata (id text , temperature double, humidity double, PRIMARY KEY (id));

Pour exécuter votre speed layer sur containers, lancez:

docker-compose up -d

⚠️ Attenion ! Si un container est appelé à utiliser un port déjà pris par une autre application, ce port ne peut pas être exploité. Exemple, si vous tentez de monter le container spark-master alos que son port 8081 est occupé, vous devez lancer cette commande pour savoir qui utilise ce port:

sudo lsof -i :8081

Une fois identifié, vous lancez sudo kill -9 <PID>

Il faut créer tout d’abord les dossiers suivants sur HDFS. Ces dossiers sont hyperimportants pour l’enregistrement des DAG du Spark Streaming.

docker exec namenode hdfs dfs -rm -r /lambda-arch
docker exec namenode hdfs dfs -mkdir -p /lambda-arch
docker exec namenode hdfs dfs -mkdir -p /lambda-arch/checkpoint
docker exec namenode hdfs dfs -chmod -R 777 /lambda-arch
docker exec namenode hdfs dfs -chown -R 777 /lambda-arch
docker exec namenode hdfs dfs -chmod -R 777 /lambda-arch/checkpoint
docker exec namenode hdfs dfs -chown -R 777 /lambda-arch/checkpoint

Vous devez aussi lancer la création de schema sous cassandra:

docker exec cassandra-iot cqlsh --username cassandra --password cassandra -f /schema.cql

Lancez le “Kafka Data production” en lançant depuis docker en chargeant le fichier jar sur votre container. Placez-vous sous le dossier target du projet kafka-producer et lancez:

docker cp kafka-producer-1.0.0.jar kafka-iot:/

Puis:

docker exec -it kafka java -jar kafka-producer-1.0.0.jar

Vous obtiendrez l’affichage suivant : image

Nous faisons de même pour le container spark-masteren chargeant le fichier jar qui se trouve dans le dossier du projet spark-processer avec la commande:

 docker cp spark-processor-1.0.0.jar spark-master:/

Lancez le Stream Processor:

docker exec spark-master /spark/bin/spark-submit --class tn.enit.tp4.processor.StreamProcessor /spark-processor-1.0.0.jar

Afin de visualiser les résultats, connectez-vous à cassandra:

docker exec -it cassandra-iot cqlsh -u cassandra -p cassandra

Puis lancez:

DESCRIBE KEYSPACES;
SELECT * FROM sensordatakeyspace.temperature;
SELECT * FROM sensordatakeyspace.humidity;

Vous obtiendrez des résultats similaires à ceux-là ⬇️ image

Manipulation 2 : Batch processing Layer

Ajoutons au projet spark-processor, au niveau du package processor, la classe BatchProcessor.java:

package tn.enit.tp4.processor;

import java.util.List;
import java.util.Properties;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import tn.enit.tp4.entity.AverageData;
import tn.enit.tp4.util.PropertyFileReader;

public class BatchProcessor {

    public static void main(String[] args) throws Exception {

        // Lire les propriétés
        String file = "spark-processor.properties";
        Properties prop = PropertyFileReader.readPropertyFile(file);

        // Config Spark
        SparkConf conf = ProcessorUtils.getSparkConf(prop);
        JavaSparkContext sc = new JavaSparkContext(conf);

        // Créer SparkSession
        SparkSession sparkSession = SparkSession.builder()
                .appName(prop.getProperty("tn.enit.tp4.spark.app.name"))
                .config(conf)
                .getOrCreate();

        // Chemin HDFS pour les données
        String saveFile = prop.getProperty("tn.enit.tp4.hdfs") + "iot-data";

        // Calculer les moyennes avec le batch processor
        List<AverageData> averageDataList = ProcessorUtils.runBatch(sparkSession, saveFile);

        // Convertir en RDD et sauvegarder dans Cassandra
        JavaRDD<AverageData> rdd = sc.parallelize(averageDataList);
        ProcessorUtils.saveAvgToCassandra(rdd);

        // Arrêter Spark
        sparkSession.stop();
        sc.close();
    }
}

Ajout de la couche Serving layer

Maintenant, nous allons créer le projet dashboard sur un IDE de votre choix avec l’arborescence suivante:

dashboard/
├── pom.xml
├── src/
   ├── main/
      ├── java/
         └── com/
             └── bigdata/
                 └── dashboard/
                     ├── SensorDataDashboard.java
                     ├── entity/
                        ├── AverageData.java
                        ├── Temperature.java
                        └── Humidity.java
                     ├── repository/
                        ├── AverageDataRepository.java
                        ├── TemperatureRepository.java
                        └── HumidityRepository.java
                     ├── service/
                        ├── AverageService.java
                        └── DataService.java
                     ├── utils/
                        └── PropertyFileReader.java
                     ├── config/
                        └── WebSocketConfig.java
                     └── dto/
                         └── Response.java
      └── resources/
          ├── application.properties
          └── static/
              ├── index.html
              └── js/
                  ├── jquery-1.12.4.min.js
                  ├── sockjs-1.1.1.min.js
                  └── stomp.min.js
   └── test/
       └── java/
           └── com/
               └── bigdata/
                   └── dashboard/
                       └── (tests)

Commençons par pom.xml.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
		                     http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.2.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

    <groupId>com.bigdata.dashboard</groupId>
    <artifactId>dashboard</artifactId>
    <version>1.0.0</version>
    <name>Spring Boot Dashboard</name>

    <properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<!-- WebSocket support -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
			<exclusions>
				<exclusion>
					<groupId>org.springframework.boot</groupId>
					<artifactId>spring-boot-starter-logging</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

		<!-- Cassandra support -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-cassandra</artifactId>
		</dependency>

		<!-- Testing -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
    </dependencies>

	<build>
		<plugins>
			<!-- Spring Boot Maven plugin -->
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>

            <!-- Compiler plugin pour Java 8 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
	</build>

</project>

Créez les packages suivants:

  • entity
  • repository
  • utils

Dans le package entity, créez la classe AverageData.java:

package com.bigdata.dashboard.entity;

import com.fasterxml.jackson.annotation.JsonFormat;
import org.springframework.data.cassandra.core.cql.PrimaryKeyType;
import org.springframework.data.cassandra.core.mapping.Column;
import org.springframework.data.cassandra.core.mapping.PrimaryKeyColumn;
import org.springframework.data.cassandra.core.mapping.Table;

import java.io.Serializable;

@Table("averagedata")
public class AverageData implements Serializable {

    @PrimaryKeyColumn(name = "id", ordinal = 0, type = PrimaryKeyType.PARTITIONED)
    private String id;

    @Column(value = "temperature")
    private double temperature;

    @Column(value = "humidity")
    private double humidity;

    public AverageData() {}

    public AverageData(String id, double temperature, double humidity) {
        this.id = id;
        this.temperature = temperature;
        this.humidity = humidity;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getHumidity() {
        return humidity;
    }

    @Override
    public String toString() {
        return "AverageData [id=" + id + ", temperature=" + temperature + ", humidity=" + humidity + "]";
    }
}

Puis, la classe suivante Humidity.java:


package com.bigdata.dashboard.entity;

import com.fasterxml.jackson.annotation.JsonFormat;
import org.springframework.data.cassandra.core.cql.PrimaryKeyType;
import org.springframework.data.cassandra.core.mapping.Column;
import org.springframework.data.cassandra.core.mapping.PrimaryKeyColumn;
import org.springframework.data.cassandra.core.mapping.Table;

import java.io.Serializable;
import java.util.Date;

@Table("humidity")
public class Humidity implements Serializable {

    @PrimaryKeyColumn(name = "id", ordinal = 0, type = PrimaryKeyType.PARTITIONED)
    private String id;

    @Column("value")
    private double value;

    @Column("timestamp")
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "MST")
    private Date timestamp;

    public Humidity() {
    }

    public Humidity(String id, double value, Date timestamp) {
        this.id = id;
        this.value = value;
        this.timestamp = timestamp;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public double getValue() {
        return value;
    }

    public void setValue(double value) {
        this.value = value;
    }

    public Date getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Date timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Humidity [id=" + id + ", value=" + value + ", timestamp=" + timestamp + "]";
    }
}

Puis, la classe Temperature.java:

package com.bigdata.dashboard.entity;

import com.fasterxml.jackson.annotation.JsonFormat;
import org.springframework.data.cassandra.core.cql.PrimaryKeyType;
import org.springframework.data.cassandra.core.mapping.Column;
import org.springframework.data.cassandra.core.mapping.PrimaryKeyColumn;
import org.springframework.data.cassandra.core.mapping.Table;

import java.io.Serializable;
import java.util.Date;

@Table("temperature")
public class Temperature implements Serializable {

    @PrimaryKeyColumn(name = "id", ordinal = 0, type = PrimaryKeyType.PARTITIONED)
    private String id;

    @Column("value")
    private double value;

    @Column("timestamp")
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "MST")
    private Date timestamp;

    public Temperature() {
    }

    public Temperature(String id, double value, Date timestamp) {
        this.id = id;
        this.value = value;
        this.timestamp = timestamp;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public double getValue() {
        return value;
    }

    public void setValue(double value) {
        this.value = value;
    }

    public Date getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Date timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Temperature [id=" + id + ", value=" + value + ", timestamp=" + timestamp + "]";
    }
}

Dans le package repository, créez la classe AverageDataRepository.java:


package com.bigdata.dashboard.repository;

import org.springframework.data.cassandra.repository.CassandraRepository;
import org.springframework.stereotype.Repository;
import com.bigdata.dashboard.entity.AverageData;

import java.util.List;

@Repository
public interface AverageDataRepository extends CassandraRepository<AverageData, String> {

    // Récupérer toutes les lignes de la table averagedata
    List<AverageData> findAll();
    
    // Exemple pour ajouter une requête personnalisée
    @Query("SELECT * FROM averagedata WHERE id=?0")
    AverageData findByIdCustom(String id);
}

Puis , la classe suivante HumidityRepository.java:

package com.bigdata.dashboard.repository;

import org.springframework.data.cassandra.repository.CassandraRepository;
import org.springframework.data.cassandra.repository.Query;
import org.springframework.stereotype.Repository;
import com.bigdata.dashboard.entity.Humidity;

import java.util.Date;

@Repository
public interface HumidityRepository extends CassandraRepository<Humidity, String> {

    @Query("SELECT * FROM humidity WHERE timestamp > ?0 ALLOW FILTERING")
    Iterable<Humidity> findHumidityByDate(Date date);
}

Puis la classe suivante TemperatureRepository.java:

package com.bigdata.dashboard.repository;

import org.springframework.data.cassandra.repository.CassandraRepository;
import org.springframework.data.cassandra.repository.Query;
import org.springframework.stereotype.Repository;
import com.bigdata.dashboard.entity.Temperature;

import java.util.Date;

@Repository
public interface TemperatureRepository extends CassandraRepository<Temperature, String> {

    @Query("SELECT * FROM temperature WHERE timestamp > ?0 ALLOW FILTERING")
    Iterable<Temperature> findTemperatureByDate(Date date);
}

Le package utils, la classe AverageService.java:

package com.bigdata.dashboard.service;

import com.bigdata.dashboard.entity.AverageData;
import com.bigdata.dashboard.repository.AverageDataRepository;
import com.bigdata.dashboard.websocket.Response;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.Date;

@Service
public class AverageService {

    @Autowired
    private SimpMessagingTemplate template;

    @Autowired
    private AverageDataRepository averageDataRepository;

    @Scheduled(fixedRate = 15000)
    public void trigger() {
        AverageData data = averageDataRepository.findLatest();
        if (data == null) {
            System.out.println("No average data found");
            return;
        }

        Response response = new Response();
        response.setTemperature(data.getTemperature());
        response.setHumidity(data.getHumidity());

        this.template.convertAndSend("/topic/average", response);
    }
}

la classe DataService.java : package com.bigdata.dashboard.service;

import com.bigdata.dashboard.repository.TemperatureRepository; import com.bigdata.dashboard.repository.HumidityRepository; import com.bigdata.dashboard.websocket.Response; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service;

import java.util.ArrayList; import java.util.Date; import java.util.List;

/**

  • Service class to send real-time temperature and humidity data
  • to the dashboard UI using WebSocket at a fixed interval. */ @Service public class DataService {

    @Autowired private SimpMessagingTemplate template;

    @Autowired private TemperatureRepository temperatureRepository;

    @Autowired private HumidityRepository humidityRepository;

    // Send data every 10 seconds @Scheduled(fixedRate = 10000) public void trigger() { System.out.println(“DataService triggered”);

     List<Double> temperatures = new ArrayList<>();
     List<Double> humidities = new ArrayList<>();
    
     // Calculate the timestamp rounded to the last minute
     Long time = new Date().getTime();
     Date date = new Date(time - time % (60 * 1000));
    
     // Fetch data from Cassandra
     temperatureRepository.findTemperatureByDate(date)
             .forEach(e -> temperatures.add(e.getValue()));
     humidityRepository.findHumidityByDate(date)
             .forEach(e -> humidities.add(e.getValue()));
    
     // Use latest value or defaults
     double temperature = temperatures.isEmpty() ? 20 : temperatures.get(temperatures.size() - 1);
     double humidity = humidities.isEmpty() ? 80 : humidities.get(humidities.size() - 1);
    
     // Prepare WebSocket response
     Response response = new Response();
     response.setTemperature(temperature);
     response.setHumidity(humidity);
    
     // Send data to UI
     this.template.convertAndSend("/topic/data", response);  } }
    
La classe `Response.java`:
```java
package com.bigdata.dashboard.websocket;

import java.io.Serializable;

/**
 * DTO utilisé pour envoyer les données de température et d'humidité
 * vers le dashboard via WebSocket.
 */
public class Response implements Serializable {

    private static final long serialVersionUID = 1L;

    private double temperature;
    private double humidity;

    public Response() {
        // constructeur par défaut
    }

    public Response(double temperature, double humidity) {
        this.temperature = temperature;
        this.humidity = humidity;
    }

    public double getTemperature() {
        return temperature;
    }

    public void setTemperature(double temperature) {
        this.temperature = temperature;
    }

    public double getHumidity() {
        return humidity;
    }

    public void setHumidity(double humidity) {
        this.humidity = humidity;
    }

    @Override
    public String toString() {
        return "Response{" +
                "temperature=" + temperature +
                ", humidity=" + humidity +
                '}';
    }
}

La classe WebSocketConfig.java :

package com.bigdata.dashboard.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * Configuration WebSocket pour envoyer des messages vers le dashboard
 * en utilisant STOMP et SockJS.
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    /**
     * Enregistre le point d'accès STOMP pour les clients WebSocket.
     * SockJS est activé pour les navigateurs qui ne supportent pas nativement WebSocket.
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp").setAllowedOriginPatterns("*").withSockJS();
    }

    /**
     * Configure le broker de messages pour transmettre les messages vers les destinations.
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // Messages envoyés aux clients via /topic
        config.enableSimpleBroker("/topic");
        // Préfixe pour les messages envoyés depuis le client vers le serveur
        config.setApplicationDestinationPrefixes("/app");
    }
}

Définissez la classe main.java hors des packages susmentionnés:

package com.bigdata.dashboard;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.cassandra.repository.config.EnableCassandraRepositories;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
 * Classe principale du projet Spring Boot Dashboard pour les données IoT.
 * Active Cassandra, la planification des tâches et démarre l'application.
 */
@SpringBootApplication
@EnableScheduling // Active l'exécution des tâches planifiées (@Scheduled)
@EnableCassandraRepositories(basePackages = "com.bigdata.dashboard.repository") // Scan des repositories Cassandra
public class SensorDataDashboard {

    public static void main(String[] args) {
        SpringApplication.run(SensorDataDashboard.class, args); // Démarre l'application Spring Boot
    }
}

Dans le dossier ressources, définissez le fichier de configuration application.properties:

# Cassandra configuration
spring.data.cassandra.keyspace-name=sensordatakeyspace
spring.data.cassandra.contact-points=localhost
spring.data.cassandra.port=9042
spring.data.cassandra.username=cassandra
spring.data.cassandra.password=cassandra

# Server configuration
server.port=3000

et le fichier iot-springboot.properties

#Cassandra  properties
com.iot.app.cassandra.host=127.0.0.1
com.iot.app.cassandra.port=9042
com.iot.app.cassandra.keyspace=sensordatakeyspace

Dans le même dossier resources, ajoutez le fihcier index.html:

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Big Data Dashboard</title>

    <!-- CSS -->
    <link rel="stylesheet"
          href="https://cdn.jsdelivr.net/npm/bootstrap@4.3.1/dist/css/bootstrap.min.css"
          integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T"
          crossorigin="anonymous">
    <style>
        h1 { text-align: center; }
        .container2 { background-color: #f8f9fa; }
        .container3 { margin: 0px 50px 20px 50px; height: 50%; }
        #chart_div_temperature, #chart_div_humidity { display: flex; justify-content: center; }
        * { box-sizing: border-box; }
        p { color: #757575; }
        .column { float: center; width: 50%; padding: 0px 40px 20px 40px; }
        .row { height: 10%; margin-top: 20px; }
    </style>

    <!-- JS Libraries -->
    <script src="js/jquery-1.12.4.min.js"></script>
    <script src="js/sockjs-1.1.1.min.js"></script>
    <script src="js/stomp.min.js"></script>
    <script type="text/javascript" src="https://www.gstatic.com/charts/loader.js"></script>

    <script type="text/javascript">
        // Initial values
        var temperature_avg = 20;
        var humidity_avg = 80;

        // WebSocket STOMP
        var socket = new SockJS('/stomp');
        var stompClient = Stomp.over(socket);
        stompClient.connect({}, function(frame) {
            console.log("Connected to STOMP");

            // Subscribe to average data
            stompClient.subscribe("/topic/average", function(data) {
                var resp = JSON.parse(data.body);
                temperature_avg = resp.temperature;
                humidity_avg = resp.humidity;
            });

            // Subscribe to real-time data
            stompClient.subscribe("/topic/data", function(data) {
                var resp = JSON.parse(data.body);
                temperature_avg = resp.temperature;
                humidity_avg = resp.humidity;
            });
        });

        // Load Google Charts
        google.charts.load('current', {'packages':['gauge','line']});
        google.charts.setOnLoadCallback(drawCharts);

        function drawCharts() {
            // --- GAUGES ---
            var dataGaugeTemp = google.visualization.arrayToDataTable([['Label','Value'],['Temperature', temperature_avg]]);
            var dataGaugeHum = google.visualization.arrayToDataTable([['Label','Value'],['Humidity', humidity_avg]]);

            var gaugeOptions = { width: 600, height: 180, redFrom: 90, redTo: 100, yellowFrom: 75, yellowTo: 90, minorTicks: 5 };
            var gaugeTemp = new google.visualization.Gauge(document.getElementById('chart_div_temperature'));
            var gaugeHum = new google.visualization.Gauge(document.getElementById('chart_div_humidity'));
            gaugeTemp.draw(dataGaugeTemp, gaugeOptions);
            gaugeHum.draw(dataGaugeHum, gaugeOptions);

            // --- LINE CHARTS ---
            function createLineChart(elementId, initialValue, label) {
                var data = new google.visualization.DataTable();
                data.addColumn('number', '15s');
                data.addColumn('number', label);

                var rows = Array.from({length:14}, (_,i) => [i+1, initialValue]);
                data.addRows(rows);

                var options = { chart:{legend:'none'}, width:600, height:225, axes:{x:{0:{side:'top'}}} };
                var chart = new google.charts.Line(document.getElementById(elementId));
                chart.draw(data, google.charts.Line.convertOptions(options));

                return { data: data, chart: chart, options: options };
            }

            var lineTempObj = createLineChart('line_top_temperature', 20, 'Temperature');
            var lineHumObj = createLineChart('line_top_humidity', 80, 'Humidity');

            function shiftData(dataTable, value) {
                for (let i=1; i<dataTable.getNumberOfRows(); i++){
                    dataTable.setValue(i-1,1,dataTable.getValue(i,1));
                }
                dataTable.setValue(dataTable.getNumberOfRows()-1,1,value);
            }

            // Update loop every 10s
            setInterval(function() {
                // Update line charts
                shiftData(lineTempObj.data, temperature_avg);
                shiftData(lineHumObj.data, humidity_avg);
                lineTempObj.chart.draw(lineTempObj.data, google.charts.Line.convertOptions(lineTempObj.options));
                lineHumObj.chart.draw(lineHumObj.data, google.charts.Line.convertOptions(lineHumObj.options));

                // Update gauges
                dataGaugeTemp.setValue(0,1,temperature_avg);
                dataGaugeHum.setValue(0,1,humidity_avg);
                gaugeTemp.draw(dataGaugeTemp, gaugeOptions);
                gaugeHum.draw(dataGaugeHum, gaugeOptions);
            }, 10000);
        }
    </script>
</head>

<body>
    <br/>
    <nav class="navbar navbar-expand-lg navbar-light">
        <a class="navbar-brand"><h1>Big Data Pipeline Dashboard</h1></a>
    </nav>

    <div class="container3">
        <div class="card container2">
            <!-- Line Charts Row -->
            <div class="row">
                <div class="column">
                    <div class="card"><div class="card-body"><div id="line_top_temperature"></div></div></div>
                </div>
                <div class="column">
                    <div class="card"><div class="card-body"><div id="line_top_humidity"></div></div></div>
                </div>
            </div>

            <!-- Gauges Row -->
            <div class="row">
                <div class="column">
                    <div class="card">
                        <div class="card-body">
                            <p>Average Temperature</p>
                            <div id="chart_div_temperature"></div>
                        </div>
                    </div>
                </div>
                <div class="column">
                    <div class="card">
                        <div class="card-body">
                            <p>Average Humidity</p>
                            <div id="chart_div_humidity"></div>
                        </div>
                    </div>
                </div>
            </div>
        </div>
    </div>
</body>
</html>

Créer un dossier js et mettez dedans les fichiers téléchargeables à partir de ce lien:

https://drive.google.com/drive/folders/1voEsjOqJ4wN2Mg6lW97rCL-4rj7YuhwN?usp=drive_link

Suivez les mêmes étapes d’exécution de la manipulation 1, en modifiant seulement la commande pour lancer le jar dans le container spark-master pour lancer le Stream processing:

docker exec spark-master /spark/bin/spark-submit  --class tn.enit.tp4.processor.StreamProcessor  --master spark://localhost:7077 /spark-processor-1.0.0.jar

et pour lancer le batch processing, lancez la commande:

docker exec spark-master /spark/bin/spark-submit --class tn.enit.tp4.processor.BatchProcessor --master spark://localhost:7077 /spark-processor-1.0.0.jar

Finalement, accédez à votre dashboard sur :

localhost:3000