TP5 Lambda Architecture

Cinquième TP

By Mariem ZAOUALI

TP5 : Création d’application Big Data Scalable en suivant l’architecture Lambda

Objectif: Ce TP vient compléter les briques développées dans le TP précédent dans l’objectif de développer une application scalable se basant sur l’architecture Lambda. Nous avons déjà développé la partie Spark Streaming, l’ingestion des données avec Kafka et le stockage à l’aide de Cassandra.

Ajout du traitement en lots

Ajoutons au projet spark-processor, au niveau du package processor, la classe BatchProcessor.java:


import java.util.List;
import java.util.Properties;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;


public class BatchProcessor {
	
	public static void main(String[] args) throws Exception {

		String file = "spark-processor.properties";
		Properties prop = PropertyFileReader.readPropertyFile(file);
		SparkConf conf = ProcessorUtils.getSparkConf(prop);
		JavaSparkContext sc = new JavaSparkContext(conf);
		
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        
		String saveFile = prop.getProperty("com.iot.app.hdfs") + "iot-data";

		List<AverageData> average_data_list = ProcessorUtils.runBatch(sparkSession, saveFile);
		
		JavaRDD<AverageData> h = sc.parallelize(average_data_list, 1); // transform to RDD
		ProcessorUtils.saveAvgToCassandra(h);

		sparkSession.close();
		sparkSession.stop();

	}

}

Ajout de la couche Serving layer

Maintenant, nous allons créer l’application dashboard sur un IDE de votre choix. Commençons par pom.xml.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.2.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
    <groupId>com.bigdata.dashboard</groupId>
    <artifactId>dashboard</artifactId>
    <version>1.0.0</version>
    <name>Spring Boot Dashboard</name>

    <!-- Import dependency management from Spring Boot -->

    <properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
			<exclusions>
				<exclusion>
					<groupId>org.springframework.boot</groupId>
					<artifactId>spring-boot-starter-logging</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-cassandra</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

    </dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
	</build>

</project>

Créez les packages suivants:

  • entity
  • repository
  • utils

Dans le package entity, créez les classes suivantes:

import com.fasterxml.jackson.annotation.JsonFormat;
import org.springframework.data.cassandra.core.cql.PrimaryKeyType;
import org.springframework.data.cassandra.core.mapping.Column;
import org.springframework.data.cassandra.core.mapping.PrimaryKeyColumn;
import org.springframework.data.cassandra.core.mapping.Table;

import java.io.Serializable;
import java.util.Date;

@Table("averagedata")
public class AverageData implements Serializable {

	@PrimaryKeyColumn(name = "id", ordinal = 0, type = PrimaryKeyType.PARTITIONED)
	private String id;

	@Column(value = "temperature")
	private double temperature;

	@Column(value = "humidity")
	private double humidity;

	public String getId() {
		return id;
	}

	public double getTemperature() {
		return temperature;
	}

	public double getHumidity() {
		return humidity;
	}


	@Override
	public String toString() {
		return "AverageData [id=" + id + ", temperature=" + temperature + ", humidity=" + humidity  +" ]";
	}

}

Puis, la classe suivante:


import com.fasterxml.jackson.annotation.JsonFormat;
import org.springframework.data.cassandra.core.cql.PrimaryKeyType;
import org.springframework.data.cassandra.core.mapping.Column;
import org.springframework.data.cassandra.core.mapping.PrimaryKeyColumn;
import org.springframework.data.cassandra.core.mapping.Table;

import java.io.Serializable;
import java.util.Date;

@Table("humidity")
public class Humidity implements Serializable {
	
	@PrimaryKeyColumn(name = "id", ordinal = 0, type = PrimaryKeyType.PARTITIONED)
	private String id;

	@Column(value = "value")
	private double value;
	
	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "MST")
	@Column(value = "timestamp")
	private Date timeStamp;

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public double getValue() {
		return value;
	}

	public void setValue(double value) {
		this.value = value;
	}

	public Date getTimeStamp() {
		return timeStamp;
	}

	public void setTimeStamp(Date timeStamp) {
		this.timeStamp = timeStamp;
	}

	@Override
	public String toString() {
		return "Humidity [id=" + id + ", value=" + value + ", timeStamp=" + timeStamp + "]";
	}

}

Puis, la classe:



import com.fasterxml.jackson.annotation.JsonFormat;
import org.springframework.data.cassandra.core.cql.PrimaryKeyType;
import org.springframework.data.cassandra.core.mapping.Column;
import org.springframework.data.cassandra.core.mapping.PrimaryKeyColumn;
import org.springframework.data.cassandra.core.mapping.Table;

import java.io.Serializable;
import java.util.Date;


@Table("temperature")
public class Temperature implements Serializable{
	
	@PrimaryKeyColumn(name = "id", ordinal = 0, type = PrimaryKeyType.PARTITIONED)
	private String id;

	@Column(value = "value")
	private double value;
	
	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "MST")
	@Column(value = "timestamp")
	private Date timeStamp;

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public double getValue() {
		return value;
	}

	public void setValue(double value) {
		this.value = value;
	}

	public Date getTimeStamp() {
		return timeStamp;
	}

	public void setTimeStamp(Date timeStamp) {
		this.timeStamp = timeStamp;
	}

	@Override
	public String toString() {
		return "Temperature [id=" + id + ", value=" + value + ", timeStamp=" + timeStamp + "]";
	}
	
	
}

Dans le package repository, créez les classes suivantes:



import org.springframework.data.cassandra.repository.CassandraRepository;
import org.springframework.data.cassandra.repository.Query;
import org.springframework.stereotype.Repository;

import com.bigdata.dashboard.entity.AverageData;


@Repository
public interface AverageDataRepository extends CassandraRepository<AverageData,String>{
	
	@Query("SELECT * FROM sensordatakeyspace.averagedata")
	 AverageData find();
	

}

Puis , la classe suivante:



import org.springframework.data.cassandra.repository.CassandraRepository;
import org.springframework.data.cassandra.repository.Query;
import org.springframework.stereotype.Repository;

import com.bigdata.dashboard.entity.Humidity;

import java.util.Date;
import java.util.UUID;


@Repository
public interface HumidityRepository extends CassandraRepository<Humidity,UUID>{
	
	@Query("SELECT * FROM sensordatakeyspace.humidity WHERE timestamp > ?0 ALLOW FILTERING")
	 Iterable<Humidity> findHumidityByDate(Date date);

}

Puis la classe suivante:



import org.springframework.data.cassandra.repository.CassandraRepository;
import org.springframework.data.cassandra.repository.Query;
import org.springframework.stereotype.Repository;

import com.bigdata.dashboard.entity.Temperature;

import java.util.Date;
import java.util.UUID;

@Repository
public interface TemperatureRepository extends CassandraRepository<Temperature, UUID>{

	 @Query("SELECT * FROM sensordatakeyspace.temperature WHERE timestamp > ?0 ALLOW FILTERING")
	 Iterable<Temperature> findTemperatureByDate(Date date);
}

Le package utils, la classe AverageService.java:


import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;


@Service
public class AverageService {

	@Autowired
	private SimpMessagingTemplate template;

	@Autowired
	private AverageDataRepository averageDataRepository;

	// Method sends data message in every 60 seconds.
	@Scheduled(fixedRate = 15000)
	public void trigger() {

		Long time = new Date().getTime();
		Date date = new Date(time - time % ( 60 * 1000)); // get data from the last minute

		AverageData data = averageDataRepository.find();
		System.out.println(data);

		double temperature = data.getTemperature();
		System.out.println(temperature);

		double humidity = data.getHumidity();
		System.out.println(humidity);
		// prepare response
		Response response = new Response();
		response.setHumidity(humidity);
		response.setTemperature(temperature);

		// send to ui
		this.template.convertAndSend("/topic/average", response);
	}

}

la classe DataService.java :


import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;



/**
 * Service class to send data messages to dashboard ui at fixed interval using
 * web-socket.
 */
@Service
public class DataService {

	@Autowired
	private SimpMessagingTemplate template;

	@Autowired
	private TemperatureRepository temperatureRepository;

	@Autowired
	private HumidityRepository humidityRepository;

	// Method sends data message in every 10 seconds.
	@Scheduled(fixedRate = 10000)
	public void trigger() {
		System.out.println("triggered");
		List<Double> temperatures = new ArrayList<>();
		List<Double> humidities = new ArrayList<>();

		Long time = new Date().getTime();
		Date date = new Date(time - time % ( 60 * 1000)); // get data from the last minute
		//Date date = new Date(time - time % (2 * 24 * 60 * 60 * 1000));

		temperatureRepository.findTemperatureByDate(date).forEach(e -> temperatures.add(e.getValue()));
		humidityRepository.findHumidityByDate(date).forEach(e -> humidities.add(e.getValue()));

		// temperatureRepository.findTemperatureByDate(date).forEach(e ->
		// temperatures.add(e.getValue()));
		// humidityRepository.findHumidityByDate(date).forEach(e ->
		// humidities.add(e.getValue()));

		double temperature = temperatures.size() > 0 ? temperatures.get(temperatures.size() - 1) : 20;
		double humidity = humidities.size() > 0 ? humidities.get(humidities.size() - 1) : 80;

		// prepare response
		Response response = new Response();
		response.setHumidity(humidity);
		response.setTemperature(temperature);

		// send to ui
		this.template.convertAndSend("/topic/data", response);
	}

}

La classe Response.java:



import java.io.Serializable;
import java.util.List;



public class Response implements Serializable {
    private double temperature;
    private double humidity;

    public double getTemperature() {
        return temperature;
    }

    public void setTemperature(double temperature) {
        this.temperature = temperature;
    }

    public double getHumidity() {
        return humidity;
    }

    public void setHumidity(double humidity) {
        this.humidity = humidity;
    }

   
}

La classe WebSocketConfig.java :

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * Web-Socket message broker configuration class to send data using SockJS
 * to dashboard html page.
 */

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    //sockJS can get message using this endpoint
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp").withSockJS();
    }

    //configure message broker
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
    }
}

Définissez la classe main.java hors des packages susmentionnés:



import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.cassandra.repository.config.EnableCassandraRepositories;
import org.springframework.scheduling.annotation.EnableScheduling;



@SpringBootApplication
@EnableScheduling
@EnableCassandraRepositories("com.bigdata.dashboard.repository")
public class SensorDataDashboard  {
    public static void main(String[] args) {
        SpringApplication.run(SensorDataDashboard.class, args);
    }
}

Dans le dossier ressources, définissez le fichier de configuration application.properties:

spring.data.cassandra.keyspace-name= sensordatakeyspace
spring.data.cassandra.port= 9042
spring.data.cassandra.password=cassandra
spring.data.cassandra.contact-points=localhost
server.port = 3000
spring.data.cassandra.username= cassandra

et le fichier iot-springboot.properties

#Cassandra  properties
com.iot.app.cassandra.host=127.0.0.1
com.iot.app.cassandra.port=9042
com.iot.app.cassandra.keyspace=sensordatakeyspace

Dans le même dossier resources, ajoutez le fihcier index.html:


<html>
<head>
<script type="text/javascript" src="js/jquery-1.12.4.min.js"></script>
<script type="text/javascript" src="js/sockjs-1.1.1.min.js"></script>
<script type="text/javascript" src="js/stomp.min.js"></script>
<script type="text/javascript"
	src="https://www.gstatic.com/charts/loader.js"></script>
<link rel="stylesheet"
	href="https://cdn.jsdelivr.net/npm/bootstrap@4.3.1/dist/css/bootstrap.min.css"
	integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T"
	crossorigin="anonymous">
<style type="text/css">
h1 {
	text-align: center;
}

.title {
	margin-left: 20px;
}

.container2 {
	background-color: #f8f9fa;
}

.container3 {
	margin: 0px 50px 20px 50px;
	height: 50%;
}

#chart_div_temperature {
	display: flex;
	justify-content: center;
}

#chart_div_humitidy {
	display: flex;
	justify-content: center;
}

* {
	box-sizing: border-box;
}

p {
	color: #757575;
}

/* Create two equal columns that floats next to each other */
.column {
	float: center;
	width: 50%;
	padding: 0px 40px 20px 40px;
}

.row {
	height: 10%;
	margin-top: 20px;
}
</style>

<script type="text/javascript"
	src="https://www.gstatic.com/charts/loader.js"></script>

<script type="text/javascript">
	google.charts.load('current', {
		'packages' : [ 'gauge' ]
	});
	google.charts.setOnLoadCallback(drawChartTemperature);

	var temperature_avg = 20;
	var humidity_avg = 80;

	var socket = new SockJS('/stomp');
	var stompClient = Stomp.over(socket);
	stompClient.connect({}, function(frame) {
		console.log("connnect");
		stompClient.subscribe("/topic/average", function(data) {
			var dataList = data.body;
			var resp = jQuery.parseJSON(dataList);

			temperature_avg = resp.temperature;
			humidity_avg = resp.humidity;

			console.log("--------", temperature_avg)
		});
	})

	function drawChartTemperature() {

		var dataTemperature = google.visualization.arrayToDataTable([
				[ 'Label', 'Value' ], [ 'Temperature', 20 ]

		]);
		var dataHumidity = google.visualization.arrayToDataTable([
				[ 'Label', 'Value' ], [ 'Humidity', 80 ]

		]);

		var options = {
			width : 600,
			height : 180,
			padding : 500,
			redFrom : 90,
			redTo : 100,
			yellowFrom : 75,
			yellowTo : 90,
			minorTicks : 5
		};

		var chart = new google.visualization.Gauge(document
				.getElementById('chart_div_temperature'));

		chart.draw(dataTemperature, options);

		var chart1 = new google.visualization.Gauge(document
				.getElementById('chart_div_humitidy'));

		chart1.draw(dataHumidity, options);

		setInterval(function() {
			console.log("helloooo", temperature_avg)
			dataTemperature.setValue(0, 1, temperature_avg);
			dataHumidity.setValue(0, 1, humidity_avg);
			chart.draw(dataTemperature, options);
			chart1.draw(dataHumidity, options);
		}, 500);

	}
</script>

<script type="text/javascript">
	var temperature = 20;
	var humidity = 80;

	var socket = new SockJS('/stomp');
	var stompClient = Stomp.over(socket);
	stompClient.connect({}, function(frame) {
		console.log("connnect");
		//subscribe "/topic/trafficData" message
		stompClient.subscribe("/topic/data", function(data) {
			var dataList = data.body;
			var resp = jQuery.parseJSON(dataList);
			temperature = resp.temperature;
			humidity = resp.humidity;

		});
	})

	function decaler(data, a) {
		for (let i = 1; i < data.getNumberOfRows(); i++) {
			data.setValue(i - 1, 1, data.getValue(i, 1));
		}
		data.setValue(data.getNumberOfRows() - 1, 1, a);
		return data;
	}

	google.charts.load('current', {
		'packages' : [ 'line' ]
	});

	google.charts.setOnLoadCallback(drawChart);

	function drawChart() {
		console.log("temperature dans draw chart");
		console.log(temperature);

		var data = new google.visualization.DataTable();
		data.addColumn('number', '15s');
		data.addColumn('number', 'Humidity');
		tab = [ [ 1, 80 ], [ 2, 80 ], [ 3, 80 ], [ 4, 80 ], [ 5, 80 ],
				[ 6, 80 ], [ 7, 80 ], [ 8, 80 ], [ 9, 80 ], [ 10, 80 ],
				[ 11, 80 ], [ 12, 80 ], [ 13, 80 ], [ 14, 80 ] ]

		data.addRows(tab);

		var options = {
			chart : {
				title : 'Current humidity',
				legend : 'none',
			},
			width : 600,
			height : 225,
			axes : {
				x : {
					0 : {
						side : 'top'
					}
				}
			}
		};

		var chart = new google.charts.Line(document
				.getElementById('line_top_humidity'));

		chart.draw(data, google.charts.Line.convertOptions(options));

		setInterval(function() {

			data = decaler(data, humidity)
			chart.draw(data, google.charts.Line.convertOptions(options));
		}, 10000);

	}
	google.charts.load('current', {
		'packages' : [ 'line' ]
	});
	google.charts.setOnLoadCallback(drawChart);
</script>
<script type="text/javascript"
	src="https://www.gstatic.com/charts/loader.js"></script>
<script type="text/javascript">
	var temperature = 20;
	var humidity = 80;

	console.log("button");
	var socket = new SockJS('/stomp');
	var stompClient = Stomp.over(socket);
	stompClient.connect({}, function(frame) {
		console.log("connnect");
		//subscribe "/topic/trafficData" message
		stompClient.subscribe("/topic/data", function(data) {
			var dataList = data.body;
			var resp = jQuery.parseJSON(dataList);
			temperature = resp.temperature;
			humidity = resp.humidity;

		});
	})

	function decaler(data, a) {
		for (let i = 1; i < data.getNumberOfRows(); i++) {
			data.setValue(i - 1, 1, data.getValue(i, 1));
		}
		data.setValue(data.getNumberOfRows() - 1, 1, a);
		return data;
	}

	function drawChart() {
		console.log("temperature dans draw chart");
		console.log(temperature);

		var data = new google.visualization.DataTable();
		data.addColumn('number', '15s');
		data.addColumn('number', 'Temperature');
		tab = [ [ 1, 20 ], [ 2, 20 ], [ 3, 20 ], [ 4, 20 ], [ 5, 20 ],
				[ 6, 20 ], [ 7, 20 ], [ 8, 20 ], [ 9, 20 ], [ 10, 20 ],
				[ 11, 20 ], [ 12, 20 ], [ 13, 20 ], [ 14, 20 ] ]

		data.addRows(tab);

		var options = {
			chart : {
				title : 'Current Temperature'
			},
			width : 600,
			height : 225,
			axes : {
				x : {
					0 : {
						side : 'top'
					}
				}
			}
		};

		var chart = new google.charts.Line(document
				.getElementById('line_top_temperature'));

		chart.draw(data, google.charts.Line.convertOptions(options));

		setInterval(function() {
			console.log("temperature dans set interrr");
			//console.log(temperature)
			data = decaler(data, temperature)
			chart.draw(data, google.charts.Line.convertOptions(options));
		}, 10000);

	}
	google.charts.load('current', {
		'packages' : [ 'line' ]
	});
	google.charts.setOnLoadCallback(drawChart);
</script>
</head>
<body>
<br/>
	<nav class="navbar navbar-expand-lg navbar-light">
		<a class="navbar-brand">
			<h1>Big Data Pipeline</h1>
		</a>
	</nav>
	<div class="container3">
		<div class="card container2">
			<div class="row">
				<div class="column">
					<div class="card">
						<div class="card-body">
							<div id="line_top_temperature"></div>
						</div>
					</div>

				</div>
				<div class="column">
					<div class="card">
						<div class="card-body">
							<div id="line_top_humidity"></div>
						</div>
					</div>

				</div>
			</div>
			<div class="row">
				<div class="column">
					<div class="card">
						<div class="card-body">
							<p>Average Temperature</p>
							<div id="chart_div_temperature"></div>
						</div>
					</div>
				</div>

				<div class="column">
					<div class="card">
						<div class="card-body">
							<p>Average Humidity</p>
							<div id="chart_div_humitidy"></div>
						</div>
					</div>
				</div>
			</div>
		</div>
	</div>

</body>
</html>

Créer un dossier js et mettez dedans les fichiers téléchargeables à partir de ce lien:

https://drive.google.com/drive/folders/1voEsjOqJ4wN2Mg6lW97rCL-4rj7YuhwN?usp=drive_link

Suivez les mêmes étapes d’exécution que le TP précédent, en modifiant seulement la commande pour lancer le jar dans le container spark-master pour lancer le Stream processing:

docker exec spark-master /spark/bin/spark-submit  --class tn.enit.tp4.processor.StreamProcessor  --master spark://localhost:7077 /spark-processor-1.0.0.jar

et pour lancer le batch processing, lancez la commande:

docker exec spark-master /spark/bin/spark-submit --class tn.enit.tp4.processor.BatchProcessor --master spark://localhost:7077 /spark-processor-1.0.0.jar

Finalement, accédez à votre dashboard sur :

localhost:3000

A vous de jouer

Cahier de charges pour le projet final