TP3 Manipulation avec Spark

Troisième TP

By Mariem ZAOUALI

TP3 : Traitement en lots avec Apache Spark

Objectifs du TP : Au terme de ce TP, vous seriez capable de :

  • Manipuler les structures de Spark à savoir RDD et Dataframes
  • Tester Spark en mode cluster sans YARN
  • Implémenter la pipeline HDFS, Spark, Hive et Sqoop pour effectuer un batch processing

Manipulation 1: Mettre à jour le docker-compose

Dans votre docker-compose.yml, vous rajoutez ces lignes sous la section services:

  • 1.Un spark-master
  • 2.Deux spark workers
spark-master:
        image: bde2020/spark-master:3.1.1-hadoop3.2
        container_name: spark-master
        depends_on:
          - namenode
          - datanode
        ports:
            - "8079:8080"
            - "7077:7077"
        environment:
            - INIT_DAEMON_STEP=setup_spark
            - HADOOP_CONF_DIR=hdfs://namenode:9000
            
  spark-worker-1:
        image: bde2020/spark-worker:3.1.1-hadoop3.2
        container_name: spark-worker-1
        depends_on:
            - spark-master
        ports:
            - "8081:8081"
        environment:
            - "SPARK_MASTER=spark://spark-master:7077"
            - CORE_CONF_fs_defaultFS=hdfs://namenode:9000
            
  spark-worker-2:
        image: bde2020/spark-worker:3.1.1-hadoop3.2
        container_name: spark-worker-2
        depends_on:
            - spark-master
        ports:
            - "8082:8081"
        environment:
            - "SPARK_MASTER=spark://spark-master:7077" 
            - CORE_CONF_fs_defaultFS=hdfs://namenode:9000

Passez maintenant à la mise en place du réseau. Placez-vous sous le répertoire enit_lab_big_data ou autre là où vous avez votre docker-compose.xml et lancez :

docker compose up -d

Pour supprimer le réseau, lancez la commande :

docker compose down

⚠️ Attention
Il se trouve que certains de vos amis n’arrivent pas à monter le réseau Docker après avoir lancé docker compose up -d à cause d’un conteneur qui ne veut pas s’arrêter.
Lancez dans ce cas :

systemctl restart docker

Manipulation 2: Manipulation de RDD sur pyspark

Après avoir lancé la commande de docker-compose up -d, nous créons un fichier à analyser à l’aide PySpark. Pour ce faire, lancez la commande :

echo "Hello world spark! Testing spark with pyspark! Hello spark" > input.txt

Maintenant, copiez ce fichier sur votre conteneur spark-master :

docker cp input.txt spark-master:/root/input.txt

Connectez-vous au conteneur spark-master :

docker exec -it spark-master bash

Vous lancez ainsi la commande suivante pour accéder à un console interactif

/spark/bin/pyspark

Tapez cette ligne pour créer une Session Spark et lancez votre traitement :

spark = SparkSession.builder.appName("Lecture de fichiers texte").getOrCreate()

Tapez:

docs = spark.read.text("/root/input.txt")                                            

image

lower = docs.rdd.map(lambda line: line.value.lower())

image

words = lower.flatMap(lambda line: line.split(" "))

image

counts = words.map(lambda word: (word, 1))

image

freq = counts.reduceByKey(lambda x, y: x + y)

image

 freq_swapped = freq.map(lambda x: (x[1], x[0]))

image

 top = freq_swapped.takeOrdered(3, key=lambda x: -x[0])

Visualisez le contenu de la variable top:

 top

Nous testons maintenant un autre moyen pour lancer un traitement écrit en python sur Spark. Créez en dehors des conteneurs le fichier wordcount.py.

from pyspark.sql import SparkSession

# Initialise SparkSession
spark = SparkSession.builder.appName("wordcount_py").getOrCreate()

# Crée un DataFrame avec les données
data = [
    ("Electronics", 1000),
    ("Clothing", 800),
    ("Electronics", 1200),
    ("Books", 300),
    ("Clothing", 600),
    ("Electronics", 900),
    ("Books", 500),
    ("Clothing", 700),
    ("Books", 400)
]

columns = ["category", "amount"]
df = spark.createDataFrame(data, columns)

# Apply batch transformations or analytics
result = df.groupBy("category").count()

# Collect top N records as per the requirement using DataFrame operations
top_records_df = result.orderBy(result['count'].desc()).limit(10)

# Show the top records DataFrame
top_records_df.show()

# Write the batch processed data to an output directory
top_records_df.write.csv("output")  # Saving as a directory 'output'

Copiez ce fichier dans le conteneur spark-master:

docker cp wordcount.py spark-master:/wordcount.py

Maintenant connectez-vous au conteneur spark-master :

docker exec -it spark-master bash

Puis lancez:

/spark/bin/spark-submit --master local --driver-memory 4g --executor-memory 2g --executor-cores 2 --py-files wordcount.py  wordcount.py

Affichez le résultat qui se trouve sous output avec la commande cat! Ca devrait ressembler à ce résultat :

Description of the image

Pour découvrir toutes les manipulations possibles avec pyspark (sur les RDD et Dataframes) expliquées en images, veuillez accéder à ce lien :

https://github.com/jkthompson/pyspark-pictures

Manipulation 3: Batch processing: Wordcount Java

Pour éviter les problèmes inhérents à la charge de l’IDE Intellij, nous créons cette fois-ci notre projet à l’aide du terminal. Mais pour ce faire, veuillez installer maven à l’aide de cette commande :

apt get install maven

Maintenant, créez un projet java mavenisé en lançant la commande suivante :

mvn archetype:generate -DgroupId=tn.enit.tp1 -DartifactId=wordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

Nous allons exécuter l’application Wordcount avec Spark. Créez un projet Wordcount à l’aide de votre IDE. Votre fichier pom.xml doit ressembler à celà:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>tn.enit.tp1</groupId>
  <artifactId>Wordcount</artifactId>
  <packaging>jar</packaging>
  <version>1.0</version>
  <name>Wordcount</name>
  <url>http://maven.apache.org</url>
  

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>2.4.5</version>
    </dependency>
        <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
	<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>30.1-jre</version> <!-- Or the appropriate version -->
</dependency>

</dependencies>

</project>

Créez un fichier sous le nom Wordcount.java et copiez son contenu :

package tn.enit.tp1;

import com.google.common.base.Preconditions;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

import java.util.Arrays;
public class Wordcount {
  private static final Logger LOGGER = LoggerFactory.getLogger(Wordcount.class);

  public static void main(String[] args) {
      Preconditions.checkArgument(args.length > 1, "Please provide the path of input file and output dir as parameters.");
      new Wordcount().run(args[0], args[1]);
  }

  public void run(String inputFilePath, String outputDir) {

      SparkConf conf = new SparkConf()
              .setAppName(Wordcount.class.getName());

      JavaSparkContext sc = new JavaSparkContext(conf);

      JavaRDD<String> textFile = sc.textFile(inputFilePath);
      JavaPairRDD<String, Integer> counts = textFile
              .flatMap(s -> Arrays.asList(s.split(" ")).iterator())
              .mapToPair(word -> new Tuple2<>(word, 1))
              .reduceByKey((a, b) -> a + b);
      counts.saveAsTextFile(outputDir);
  }
}

Si vous êtes sous IntelliJ, vous pouvez lancer l’étape clean puis package du Lifecycle à l’aide du menu Maven situé par défaut au coin supérieur droite de votre écran. Sinon, positionnez vous à l’emplacement du pom.xml et lancez la commande:

mvn clean package

L’exécution réussie de cette commande va vous générer sous le dossier /target/ le fichier wordcount1.0.jar. Vous allez copier ce jar vers le conteneur spark-master :

 sudo docker cp target/Wordcount-1.0.jar spark-master:/Wordcount-1.0.jar

Nous générons ensuite le fichier sur lequel nous exécutons le code Wordcount.

echo "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum." > loremipsum.txt

Copiez-le vers le conteneur spark-master :

sudo docker cp loremipsum.txt spark-master:/loremipsum.txt

Connectez-vous sur le conteneur spark-master :

sudo docker exec -it spark-master bash

Lancez finalement le traitement sur spark en mode cluster mais sans cluster manager spécifique:

/root/spark/bin/spark-submit   --class tn.enit.tp1.Wordcount --master local Wordcount-1.0.jar loremipsum.txt output

Le résultat que vous attendez ressemblera à cela :

Description of the image

Manipulation 4: Batch processing and serving layer

Nous abordons dans cette partie, une étude de cas de traitement en lots (batch processing) des données d’une chaîne de restauration. Nous disposons de deux fichiers

  • Restaurant_details.csv a pour champs : id,restaurant_name,category,esimated_cooking_time,latitude,longitude
  • Order_details.csv a pour champs : order_created_timestamp,status,price,discount,id,driver_id,user_id,restaurant_id

Nous voulons à travers l’étude des données massives enregistrées extraire les réponses suivantes :

  • 1.Le temps moyen de traitement de chaque catégorie de plat2.
  • 2.Le top 3 des restaurants ayant un temps de préparation de plat réduit

Pour ce faire, nous allons adopter l’architecture de système illustré par la figure 1 à travers ce tp. Si ce n’est pas déjà fait, lancez votre réseau de conteneur du Lab3.

Description of the image

Postgres

Connectez-vous au conteneur hive-server. Une fois connecté, nous allons lancer un script pour configurer sqoop.

chmod 7 start-hive.sh

Puis lancez :

./start-hive.sh

Créez avec hive cli la base de données lab3_big_data. Accédez au container de postgres avec la commande :

sudo docker exec -it postgres-db psql -U postgres

Affichez toutes les bases de données que vous avez à bord avec la commande :

\l

Vous allez accéder à la base « temp_db » avec la commande :

\c temp_db

Vous pouvez vérifier s’il y a des tables dans la base :

\dt

Nous allons créer une table qui s’appelle order_detail que nous chargerons d’un fichier csv. Supprimez la table si elle existe.

DROP TABLE IF EXISTS order_detail;

Puis créez la table:

CREATE TABLE IF NOT EXISTS order_detail (
order_created_timestamp TIMESTAMP,
    status VARCHAR,
    price INT,
    discount FLOAT,
    id VARCHAR PRIMARY KEY NOT NULL,
    driver_id VARCHAR,
    user_id VARCHAR,
    restaurant_id VARCHAR
);

Puis lancez

COPY order_detail FROM '/var/lib/postgresql/data/lmwn/order_detail.csv' DELIMITER ',' CSV HEADER;

Maintenant, nous chargerons les données qui proviennent du fichier restaurant_detail.csv . Lancez:

DROP TABLE IF EXISTS restaurant_detail;

Puis:

CREATE TABLE IF NOT EXISTS restaurant_detail (
    id VARCHAR PRIMARY KEY NOT NULL,
    restaurant_name VARCHAR,
    category VARCHAR,
    estimated_cooking_time FLOAT,
    latitude FLOAT,
    longitude FLOAT
);

Ensuite, lancez:

COPY restaurant_detail FROM '/var/lib/postgresql/data/lmwn/restaurant_detail.csv' DELIMITER ',' CSV HEADER;

Quittez postgres en tapant

\q

Hive et Sqoop

Nous allons maintenant importer les données de la table que nous venons de créer sur notre RDBMS vers Hive à l’aide de Sqoop. Tapez la commande suivante en se connectant au conteneur hive-server pour importer la table order_detail:

/usr/lib/sqoop/bin/sqoop import --connect jdbc:postgresql://database:5432/temp_db --table order_detail --username postgres --password passw0rd --hive-import --hive-table lab3_big_data.order_detail --num-mappers 1 --map-column-hive order_created_timestamp=STRING,status=STRING,price=INT,discount=FLOAT,id=STRING,driver_id=STRING,user_id=STRING,restaurant_id=STRING

Tapez la commande suivante pour importer la table restaurant_detail:

/usr/lib/sqoop/bin/sqoop import --connect jdbc:postgresql://database:5432/temp_db --table restaurant_detail --username postgres --password passw0rd --hive-import --hive-table lab3_big_data.restaurant_detail --num-mappers 1 --map-column-hive id=STRING,restaurant_name=STRING,category=STRING,estimated_cooking_time=FLOAT,latitude=FLOAT,longitude=FLOAT

Connectez-vous à hive-cli:

sudo docker exec -it hive-server bash

Créez la table order_detail_hive sous la base default.

DROP TABLE IF EXISTS order_detail_hive;
CREATE EXTERNAL TABLE IF NOT EXISTS order_detail_hive (
    order_created_timestamp TIMESTAMP,
    status STRING,
    price INT,
    discount FLOAT,
    id STRING,
    driver_id STRING,
    user_id STRING,
    restaurant_id STRING
)
STORED AS PARQUET
LOCATION '/user/spark/order_detail_hive';

Vérifiez si vos données ont été migrées vers Hive.

SELECT * FROM order_detail_hive LIMIT 5;

Créez la table restaurant_detail sous la base default (ca veut dire ne choisissez aucune base dès que vous vous connectez à hive-cli):

DROP TABLE IF EXISTS restaurant_detail_hive;

Puis:

CREATE EXTERNAL TABLE IF NOT EXISTS restaurant_detail_hive (
    id STRING,
    restaurant_name STRING,
    category STRING,
    estimated_cooking_time FLOAT,
    latitude FLOAT,
    longitude FLOAT
)
STORED AS PARQUET
LOCATION '/user/spark/restaurant_detail_hive';

Vérifiez l’état du transfert:

SELECT * FROM restaurant_detail_hive LIMIT 5;

Nous effectuons ici un nettoyage des données sur les deux tables :

INSERT OVERWRITE TABLE order_detail_hive
SELECT
    order_created_timestamp,
    status,
    price,
    COALESCE(discount, 0.0) AS discount,
    id,
    driver_id,
    user_id,
    restaurant_id
FROM
        lab3_big_data.order_detail;
INSERT OVERWRITE TABLE restaurant_detail_hive
SELECT
    id,
    restaurant_name,
    category,
    CASE
        WHEN estimated_cooking_time BETWEEN 10 AND 40 THEN 1
        WHEN estimated_cooking_time BETWEEN 41 AND 80 THEN 2
        WHEN estimated_cooking_time BETWEEN 81 AND 120 THEN 3
        WHEN estimated_cooking_time > 120 THEN 4
        ELSE NULL
    END AS cooking_time_category,
    latitude,
    longitude
FROM
    lab3_big_data.restaurant_detail;

Batch view

Déconnectez-vous du conteneur hive-server. Sous le dossier enit_lab_big_data, créez le fichier RestaurantProcessingTime.py.

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, desc, asc

spark = SparkSession.builder \
    .appName("Restaurant Processing Time") \
    .enableHiveSupport() \
    .config("spark.sql.parquet.writeLegacyFormat",True)\
    .getOrCreate()

# Charger les tables depuis Hive
restaurantDetail = spark.read.parquet("hdfs://namenode:9000/user/spark/restaurant_detail_hive")
orderDetail = spark.read.parquet("hdfs://namenode:9000/user/spark/order_detail_hive")

# Calculer le temps moyen de traitement pour chaque catégorie de plat
avgProcessingTimePerCategory = orderDetail.join(restaurantDetail, orderDetail["restaurant_id"] == restaurantDetail["id"]) \
    .groupBy("category") \
    .agg(avg("estimated_cooking_time").alias("avg_processing_time")) \
    .orderBy(desc("avg_processing_time"))

# Afficher le temps moyen de traitement pour chaque catégorie de plat
print("Temps moyen de traitement pour chaque catégorie de plat :")
avgProcessingTimePerCategory.show()

# Calculer le top 3 des restaurants ayant un temps de préparation réduit
top3Restaurants = restaurantDetail.join(orderDetail, orderDetail["restaurant_id"] == restaurantDetail["id"]) \
    .groupBy("restaurant_name") \
    .agg(avg("estimated_cooking_time").alias("avg_cooking_time")) \
    .orderBy(asc("avg_cooking_time")) \
    .limit(3)

# Afficher le top 3 des restaurants ayant un temps de préparation réduit
print("Top 3 des restaurants avec un temps de préparation réduit :")
top3Restaurants.show()

# Enregistrer le temps moyen de traitement par catégorie dans un fichier CSV
avgProcessingTimePerCategory.write.csv("/avg_processing_time_per_category.csv")

# Enregistrer le top 3 des restaurants dans un fichier CSV
top3Restaurants.write.csv("/top_3_restaurants.csv")

spark.stop()

Copiez avec docker ce fichier dans le conteneur spark-master. Puis, accédez au conteneur spark-master.

sudo docker exec -it spark-master bash

Lancez la commande suivante pour soumettre le travail au spark driver.

/spark/bin/spark-submit --master local --driver-memory 4g --executor-memory 2g --executor-cores 2 --py-files RestaurantProcessingTime.py RestaurantProcessingTime.py

Vous devez avoir deux répertoires top_3_restaurants.csv et avg_processing_time_per_category.csv. Tapez à chaque fois la commande suivante :

cd top_3_restaurants.csv && cat part-*
cd avg_processing_time_per_category.csv && cat part-*

Résultat de l'exécution de Spark

Résultat 2 de l'exécution de Spark

A vous de jouer

Chargez les fichiers générés par Spark dans une RDBMS (ou utilisez les directement) pour les afficher dans une interface UI web du langage de votre choix.

Vous appliquez la manipulation 4 sur vos données pour faire un batch processing et un serving Layer en utilisant Spark, Hive, Postgres, Sqoop et Web UI.