TP3 : Traitement en lots avec Apache Spark
Objectifs du TP : Au terme de ce TP, vous seriez capable de :
- Manipuler les structures de Spark à savoir RDD et Dataframes
- Tester Spark en mode cluster sans YARN
- Implémenter la pipeline HDFS, Spark, Hive et Sqoop pour effectuer un batch processing
Manipulation 1: Mettre à jour le docker-compose
Dans votre docker-compose.yml, vous rajoutez ces lignes sous la section services:
- 1.Un spark-master
- 2.Deux spark workers
spark-master:
image: bde2020/spark-master:3.1.1-hadoop3.2
container_name: spark-master
depends_on:
- namenode
- datanode
ports:
- "8079:8080"
- "7077:7077"
environment:
- INIT_DAEMON_STEP=setup_spark
- HADOOP_CONF_DIR=hdfs://namenode:9000
spark-worker-1:
image: bde2020/spark-worker:3.1.1-hadoop3.2
container_name: spark-worker-1
depends_on:
- spark-master
ports:
- "8081:8081"
environment:
- "SPARK_MASTER=spark://spark-master:7077"
- CORE_CONF_fs_defaultFS=hdfs://namenode:9000
spark-worker-2:
image: bde2020/spark-worker:3.1.1-hadoop3.2
container_name: spark-worker-2
depends_on:
- spark-master
ports:
- "8082:8081"
environment:
- "SPARK_MASTER=spark://spark-master:7077"
- CORE_CONF_fs_defaultFS=hdfs://namenode:9000
Passez maintenant à la mise en place du réseau. Placez-vous sous le répertoire enit_lab_big_data ou autre là où vous avez votre docker-compose.xml et lancez :
docker compose up -d
Pour supprimer le réseau, lancez la commande :
docker compose down
⚠️ Attention
Il se trouve que certains de vos amis n’arrivent pas à monter le réseau Docker après avoir lancédocker compose up -dà cause d’un conteneur qui ne veut pas s’arrêter.
Lancez dans ce cas :systemctl restart docker
Manipulation 2: Manipulation de RDD sur pyspark
Après avoir lancé la commande de docker-compose up -d, nous créons un fichier à analyser à l’aide PySpark. Pour ce faire, lancez la commande :
echo "Hello world spark! Testing spark with pyspark! Hello spark" > input.txt
Maintenant, copiez ce fichier sur votre conteneur spark-master :
docker cp input.txt spark-master:/root/input.txt
Connectez-vous au conteneur spark-master :
docker exec -it spark-master bash
Vous lancez ainsi la commande suivante pour accéder à un console interactif
/spark/bin/pyspark
Tapez cette ligne pour créer une Session Spark et lancez votre traitement :
spark = SparkSession.builder.appName("Lecture de fichiers texte").getOrCreate()
Tapez:
docs = spark.read.text("/root/input.txt")
lower = docs.rdd.map(lambda line: line.value.lower())
words = lower.flatMap(lambda line: line.split(" "))
counts = words.map(lambda word: (word, 1))
freq = counts.reduceByKey(lambda x, y: x + y)
freq_swapped = freq.map(lambda x: (x[1], x[0]))
top = freq_swapped.takeOrdered(3, key=lambda x: -x[0])
Visualisez le contenu de la variable top:
top
Nous testons maintenant un autre moyen pour lancer un traitement écrit en python sur Spark. Créez en dehors des conteneurs le fichier wordcount.py.
from pyspark.sql import SparkSession
# Initialise SparkSession
spark = SparkSession.builder.appName("wordcount_py").getOrCreate()
# Crée un DataFrame avec les données
data = [
("Electronics", 1000),
("Clothing", 800),
("Electronics", 1200),
("Books", 300),
("Clothing", 600),
("Electronics", 900),
("Books", 500),
("Clothing", 700),
("Books", 400)
]
columns = ["category", "amount"]
df = spark.createDataFrame(data, columns)
# Apply batch transformations or analytics
result = df.groupBy("category").count()
# Collect top N records as per the requirement using DataFrame operations
top_records_df = result.orderBy(result['count'].desc()).limit(10)
# Show the top records DataFrame
top_records_df.show()
# Write the batch processed data to an output directory
top_records_df.write.csv("output") # Saving as a directory 'output'
Copiez ce fichier dans le conteneur spark-master:
docker cp wordcount.py spark-master:/wordcount.py
Maintenant connectez-vous au conteneur spark-master :
docker exec -it spark-master bash
Puis lancez:
/spark/bin/spark-submit --master local --driver-memory 4g --executor-memory 2g --executor-cores 2 --py-files wordcount.py wordcount.py
Affichez le résultat qui se trouve sous output avec la commande cat! Ca devrait ressembler à ce résultat :
Pour découvrir toutes les manipulations possibles avec pyspark (sur les RDD et Dataframes) expliquées en images, veuillez accéder à ce lien :
https://github.com/jkthompson/pyspark-pictures
Manipulation 3: Batch processing: Wordcount Java
Pour éviter les problèmes inhérents à la charge de l’IDE Intellij, nous créons cette fois-ci notre projet à l’aide du terminal. Mais pour ce faire, veuillez installer maven à l’aide de cette commande :
apt get install maven
Maintenant, créez un projet java mavenisé en lançant la commande suivante :
mvn archetype:generate -DgroupId=tn.enit.tp1 -DartifactId=wordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
Nous allons exécuter l’application Wordcount avec Spark. Créez un projet Wordcount à l’aide de votre IDE.
Votre fichier pom.xml doit ressembler à celà:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>tn.enit.tp1</groupId>
<artifactId>Wordcount</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<name>Wordcount</name>
<url>http://maven.apache.org</url>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version> <!-- Or the appropriate version -->
</dependency>
</dependencies>
</project>
Créez un fichier sous le nom Wordcount.java et copiez son contenu :
package tn.enit.tp1;
import com.google.common.base.Preconditions;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.Arrays;
public class Wordcount {
private static final Logger LOGGER = LoggerFactory.getLogger(Wordcount.class);
public static void main(String[] args) {
Preconditions.checkArgument(args.length > 1, "Please provide the path of input file and output dir as parameters.");
new Wordcount().run(args[0], args[1]);
}
public void run(String inputFilePath, String outputDir) {
SparkConf conf = new SparkConf()
.setAppName(Wordcount.class.getName());
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> textFile = sc.textFile(inputFilePath);
JavaPairRDD<String, Integer> counts = textFile
.flatMap(s -> Arrays.asList(s.split(" ")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);
counts.saveAsTextFile(outputDir);
}
}
Si vous êtes sous IntelliJ, vous pouvez lancer l’étape clean puis package du Lifecycle à l’aide du menu Maven situé par défaut au coin supérieur droite de votre écran.
Sinon, positionnez vous à l’emplacement du pom.xml et lancez la commande:
mvn clean package
L’exécution réussie de cette commande va vous générer sous le dossier /target/ le fichier wordcount1.0.jar. Vous allez copier ce jar vers le conteneur spark-master :
sudo docker cp target/Wordcount-1.0.jar spark-master:/Wordcount-1.0.jar
Nous générons ensuite le fichier sur lequel nous exécutons le code Wordcount.
echo "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum." > loremipsum.txt
Copiez-le vers le conteneur spark-master :
sudo docker cp loremipsum.txt spark-master:/loremipsum.txt
Connectez-vous sur le conteneur spark-master :
sudo docker exec -it spark-master bash
Lancez finalement le traitement sur spark en mode cluster mais sans cluster manager spécifique:
/root/spark/bin/spark-submit --class tn.enit.tp1.Wordcount --master local Wordcount-1.0.jar loremipsum.txt output
Le résultat que vous attendez ressemblera à cela :
Manipulation 4: Batch processing and serving layer
Nous abordons dans cette partie, une étude de cas de traitement en lots (batch processing) des données d’une chaîne de restauration. Nous disposons de deux fichiers
- Restaurant_details.csv a pour champs : id,restaurant_name,category,esimated_cooking_time,latitude,longitude
- Order_details.csv a pour champs : order_created_timestamp,status,price,discount,id,driver_id,user_id,restaurant_id
Nous voulons à travers l’étude des données massives enregistrées extraire les réponses suivantes :
- 1.Le temps moyen de traitement de chaque catégorie de plat2.
- 2.Le top 3 des restaurants ayant un temps de préparation de plat réduit
Pour ce faire, nous allons adopter l’architecture de système illustré par la figure 1 à travers ce tp. Si ce n’est pas déjà fait, lancez votre réseau de conteneur du Lab3.
Postgres
Connectez-vous au conteneur hive-server. Une fois connecté, nous allons lancer un script pour configurer sqoop.
chmod 7 start-hive.sh
Puis lancez :
./start-hive.sh
Créez avec hive cli la base de données lab3_big_data. Accédez au container de postgres avec la commande :
sudo docker exec -it postgres-db psql -U postgres
Affichez toutes les bases de données que vous avez à bord avec la commande :
\l
Vous allez accéder à la base « temp_db » avec la commande :
\c temp_db
Vous pouvez vérifier s’il y a des tables dans la base :
\dt
Nous allons créer une table qui s’appelle order_detail que nous chargerons d’un fichier csv. Supprimez la table si elle existe.
DROP TABLE IF EXISTS order_detail;
Puis créez la table:
CREATE TABLE IF NOT EXISTS order_detail (
order_created_timestamp TIMESTAMP,
status VARCHAR,
price INT,
discount FLOAT,
id VARCHAR PRIMARY KEY NOT NULL,
driver_id VARCHAR,
user_id VARCHAR,
restaurant_id VARCHAR
);
Puis lancez
COPY order_detail FROM '/var/lib/postgresql/data/lmwn/order_detail.csv' DELIMITER ',' CSV HEADER;
Maintenant, nous chargerons les données qui proviennent du fichier restaurant_detail.csv . Lancez:
DROP TABLE IF EXISTS restaurant_detail;
Puis:
CREATE TABLE IF NOT EXISTS restaurant_detail (
id VARCHAR PRIMARY KEY NOT NULL,
restaurant_name VARCHAR,
category VARCHAR,
estimated_cooking_time FLOAT,
latitude FLOAT,
longitude FLOAT
);
Ensuite, lancez:
COPY restaurant_detail FROM '/var/lib/postgresql/data/lmwn/restaurant_detail.csv' DELIMITER ',' CSV HEADER;
Quittez postgres en tapant
\q
Hive et Sqoop
Nous allons maintenant importer les données de la table que nous venons de créer sur notre RDBMS vers Hive à l’aide de Sqoop. Tapez la commande suivante en se connectant au conteneur hive-server pour importer la table order_detail:
/usr/lib/sqoop/bin/sqoop import --connect jdbc:postgresql://database:5432/temp_db --table order_detail --username postgres --password passw0rd --hive-import --hive-table lab3_big_data.order_detail --num-mappers 1 --map-column-hive order_created_timestamp=STRING,status=STRING,price=INT,discount=FLOAT,id=STRING,driver_id=STRING,user_id=STRING,restaurant_id=STRING
Tapez la commande suivante pour importer la table restaurant_detail:
/usr/lib/sqoop/bin/sqoop import --connect jdbc:postgresql://database:5432/temp_db --table restaurant_detail --username postgres --password passw0rd --hive-import --hive-table lab3_big_data.restaurant_detail --num-mappers 1 --map-column-hive id=STRING,restaurant_name=STRING,category=STRING,estimated_cooking_time=FLOAT,latitude=FLOAT,longitude=FLOAT
Connectez-vous à hive-cli:
sudo docker exec -it hive-server bash
Créez la table order_detail_hive sous la base default.
DROP TABLE IF EXISTS order_detail_hive;
CREATE EXTERNAL TABLE IF NOT EXISTS order_detail_hive (
order_created_timestamp TIMESTAMP,
status STRING,
price INT,
discount FLOAT,
id STRING,
driver_id STRING,
user_id STRING,
restaurant_id STRING
)
STORED AS PARQUET
LOCATION '/user/spark/order_detail_hive';
Vérifiez si vos données ont été migrées vers Hive.
SELECT * FROM order_detail_hive LIMIT 5;
Créez la table restaurant_detail sous la base default (ca veut dire ne choisissez aucune base dès que vous vous connectez à hive-cli):
DROP TABLE IF EXISTS restaurant_detail_hive;
Puis:
CREATE EXTERNAL TABLE IF NOT EXISTS restaurant_detail_hive (
id STRING,
restaurant_name STRING,
category STRING,
estimated_cooking_time FLOAT,
latitude FLOAT,
longitude FLOAT
)
STORED AS PARQUET
LOCATION '/user/spark/restaurant_detail_hive';
Vérifiez l’état du transfert:
SELECT * FROM restaurant_detail_hive LIMIT 5;
Nous effectuons ici un nettoyage des données sur les deux tables :
INSERT OVERWRITE TABLE order_detail_hive
SELECT
order_created_timestamp,
status,
price,
COALESCE(discount, 0.0) AS discount,
id,
driver_id,
user_id,
restaurant_id
FROM
lab3_big_data.order_detail;
INSERT OVERWRITE TABLE restaurant_detail_hive
SELECT
id,
restaurant_name,
category,
CASE
WHEN estimated_cooking_time BETWEEN 10 AND 40 THEN 1
WHEN estimated_cooking_time BETWEEN 41 AND 80 THEN 2
WHEN estimated_cooking_time BETWEEN 81 AND 120 THEN 3
WHEN estimated_cooking_time > 120 THEN 4
ELSE NULL
END AS cooking_time_category,
latitude,
longitude
FROM
lab3_big_data.restaurant_detail;
Batch view
Déconnectez-vous du conteneur hive-server. Sous le dossier enit_lab_big_data, créez le fichier RestaurantProcessingTime.py.
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, desc, asc
spark = SparkSession.builder \
.appName("Restaurant Processing Time") \
.enableHiveSupport() \
.config("spark.sql.parquet.writeLegacyFormat",True)\
.getOrCreate()
# Charger les tables depuis Hive
restaurantDetail = spark.read.parquet("hdfs://namenode:9000/user/spark/restaurant_detail_hive")
orderDetail = spark.read.parquet("hdfs://namenode:9000/user/spark/order_detail_hive")
# Calculer le temps moyen de traitement pour chaque catégorie de plat
avgProcessingTimePerCategory = orderDetail.join(restaurantDetail, orderDetail["restaurant_id"] == restaurantDetail["id"]) \
.groupBy("category") \
.agg(avg("estimated_cooking_time").alias("avg_processing_time")) \
.orderBy(desc("avg_processing_time"))
# Afficher le temps moyen de traitement pour chaque catégorie de plat
print("Temps moyen de traitement pour chaque catégorie de plat :")
avgProcessingTimePerCategory.show()
# Calculer le top 3 des restaurants ayant un temps de préparation réduit
top3Restaurants = restaurantDetail.join(orderDetail, orderDetail["restaurant_id"] == restaurantDetail["id"]) \
.groupBy("restaurant_name") \
.agg(avg("estimated_cooking_time").alias("avg_cooking_time")) \
.orderBy(asc("avg_cooking_time")) \
.limit(3)
# Afficher le top 3 des restaurants ayant un temps de préparation réduit
print("Top 3 des restaurants avec un temps de préparation réduit :")
top3Restaurants.show()
# Enregistrer le temps moyen de traitement par catégorie dans un fichier CSV
avgProcessingTimePerCategory.write.csv("/avg_processing_time_per_category.csv")
# Enregistrer le top 3 des restaurants dans un fichier CSV
top3Restaurants.write.csv("/top_3_restaurants.csv")
spark.stop()
Copiez avec docker ce fichier dans le conteneur spark-master. Puis, accédez au conteneur spark-master.
sudo docker exec -it spark-master bash
Lancez la commande suivante pour soumettre le travail au spark driver.
/spark/bin/spark-submit --master local --driver-memory 4g --executor-memory 2g --executor-cores 2 --py-files RestaurantProcessingTime.py RestaurantProcessingTime.py
Vous devez avoir deux répertoires top_3_restaurants.csv et avg_processing_time_per_category.csv. Tapez à chaque fois la commande suivante :
cd top_3_restaurants.csv && cat part-*
cd avg_processing_time_per_category.csv && cat part-*
A vous de jouer
Chargez les fichiers générés par Spark dans une RDBMS (ou utilisez les directement) pour les afficher dans une interface UI web du langage de votre choix.
Vous appliquez la manipulation 4 sur vos données pour faire un batch processing et un serving Layer en utilisant Spark, Hive, Postgres, Sqoop et Web UI.