BT

Diffuser les Connaissances et l'Innovation dans le Développement Logiciel d'Entreprise

Contribuez

Sujets

Sélectionner votre région

Accueil InfoQ Articles Traitements Big Data avec Apache Spark - 2ème partie : SparkSQL

Traitements Big Data avec Apache Spark - 2ème partie : SparkSQL

Dans l’article précédent de cette série sur Apache Spark, nous avons vu de quoi est constitué le framework et en quoi celui-ci aide à répondre aux besoins d’analyses big data de l’entreprise. Spark SQL, composant du framework Apache Spark, est utilisé pour effectuer des traitements sur des données structurées en exécutant des requêtes de type SQL sur les données Spark. Il nous permet d’exécuter des requêtes ad-hoc après une étape d’ETL sur des données stockées sous différents formats, comme JSON ou Parquet, ou des données stockées dans des bases de données par exemple.

Dans ce deuxième article, nous examinerons la librairie Spark SQL et nous verrons comment celle-ci peut être utilisée pour effectuer des requêtes SQL sur des données stockées dans des fichiers plats, des jeux de données JSON ou des tables Hive. Spark 1.3 est la dernière version du framework, mise à disposition le mois dernier. Avant cette version, le module Spark SQL était au statut “Alpha”. Cette version comprend plusieurs nouvelles fonctionnalités, dont :

  • DataFrame : La nouvelle version fournit une abstraction appelée DataFrames qui peut se voir comme une source SQL distribuée.
  • Data Sources : Avec l’ajout de l’API data sources, Spark SQL facilite les traitements de données structurées stockées selon une grande variété de formats dont Parquet, JSON et Apache Avro.
  • JDBC Server : Un serveur JDBC intégré permet de se connecter facilement à des données stockées dans des bases relationnelles et de réaliser des analyses big data depuis les outils BI traditionnels.

Les composants de Spark SQL

Les deux principaux composants manipulés lorsque l’on utilise Spark SQL sont les DataFrames et SQLContext. Commençons par nous intéresser à DataFrame.

DataFrame

Une DataFrame est une collection de données distribuées, organisées en colonnes nommées. Ce concept est basé sur celui des data frames du langage R et est similaire à une table dans le monde des bases de données relationnelles. Notez que c’est en fait ce qu’on appelait SchemaRDD dans les précédentes versions de l’API Spark SQL qui a été renommé DataFrame. Les DataFrames peuvent être converties en RDD en appelant la méthode rdd, qui retourne le contenu de la DataFrame sous forme de RDD de lignes.

Les DataFrames peuvent être créées à partir de différentes sources de données, notamment depuis :

  • Des RDD existants
  • Des fichiers de données structurées
  • Des jeux de données JSON
  • Des tables HIVE
  • Des bases de données externes

Spark SQL et l’API DataFrame sont disponibles dans les langages suivants :

Les exemples de code présentés dans cet article utilisent le Shell Scala de Spark.

SQLContext

Spark SQL fournit SQLContext afin d’encapsuler les fonctions du monde relationnel dans Spark. Le SQLContext est créé à partir du SparkContext existant, vu dans les exemples précédents. L’extrait de code suivant montre comment créer un objet SQLContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

HiveContext fournit aussi un sur ensemble des fonctionnalités proposées par SQLContext. Il peut être utilisé pour écrire des requêtes en utilisant le parser HiveQL et lire des données depuis des tables Hive. Notez que vous n’avez pas besoin d’un environnement Hive existant pour utiliser HiveContext dans les programmes Spark.

Sources de données JDBC

Parmi les autres fonctionnalités de la librairie Spark SQL, on trouve les sources de données JDBC. Celles-ci peuvent être utilisées pour lire des données depuis des bases de données relationnelles avec l’API JDBC. Cette approche est favorisée par rapport à l’utilisation du JdbcRDD car la source de données retourne les résultats sous forme de DataFrame, qui peut être traitée dans SparkSQL ou jointe à d’autres sources de données.

Exemple d’application SparkSQL

Dans l’article précédent, nous avons appris comment installer le framework Spark sur une machine locale, comment le lancer et interagir avec depuis un programme Shell Scala. Pour installer la dernière version de Spark, téléchargez-la depuis le site web. Pour les exemples de code de cet article, nous utiliserons le Shell pour exécuter les programmes Spark SQL. Ces exemples de code sont faits pour un environnement Windows. Pour s’assurer que le programme Shell ait assez de mémoire, utilisez l’argument de ligne de commande driver-memory lors de l’exécution de spark-shell :

spark-shell.cmd --driver-memory 1G

L’application Spark SQL

Une fois que vous avez lancé le Shell, vous pouvez exécuter des requêtes analytiques en utilisant l’API Spark SQL. Dans le premier exemple, nous chargeons des données sur des clients depuis un fichier texte et créons un objet DataFrame à partir du jeu de données. Ensuite, nous pouvons exécuter des fonctions sur la DataFrame pour effectuer des requêtes de sélection des données. Examinons le contenu du fichier texte customers.txt présenté ci-dessous :

100, John Smith, Austin, TX, 78727
200, Joe Johnson, Dallas, TX, 75201
300, Bob Jones, Houston, TX, 77028
400, Andy Davis, San Antonio, TX, 78227
500, James Williams, Austin, TX, 78727

Le code suivant montre les commandes Spark SQL que vous pouvez exécuter depuis la console Spark Shell.

// Create the SQLContext first from the existing Spark Context
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Import statement to implicitly convert an RDD to a DataFrame
import sqlContext.implicits._

// Create a custom class to represent the Customer
case class Customer(customer_idIntnameStringcityStringstateStringzip_codeString)

// Create a DataFrame of Customer objects from the dataset text file.
val dfCustomers = sc.textFile("data/customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt, p(1), p(2), p(3), p(4))).toDF()

// Register DataFrame as a table.
dfCustomers.registerTempTable("customers")

// Display the content of DataFrame
dfCustomers.show()

// Print the DF schema
dfCustomers.printSchema()

// Select customer name column
dfCustomers.select("name").show()

// Select customer name and city columns
dfCustomers.select("name""city").show()

// Select a customer by id
dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show()

// Count the customers by zip code
dfCustomers.groupBy("zip_code").count().show()

Dans l’exemple ci-dessus, le schéma est inféré par réflexion. Il est également possible de spécifier le schéma du dataset par le code. Ceci est utile lorsque des classes spécifiques ne peuvent pas être définies à l’avance parce que la structure de la donnée est encodée dans une chaîne de caractères. Le code suivant montre comment spécifier le schéma en utilisant les nouveaux types de données StructType, StringType et StructField.

//
// Programmatically Specifying the Schema
//

// Create SQLContext from the existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val rddCustomers = sc.textFile("data/customers.txt")

// The schema is encoded in a string
val schemaString = "customer_id name city state zip_code"

// Import Spark SQL data types and Row.
import org.apache.spark.sql._

import org.apache.spark.sql.types._;

// Generate the schema based on the string of schema
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringTypetrue)))

// Convert records of the RDD (rddCustomers) to Rows.
val rowRDD = rddCustomers.map(_.split(",")).map(p => Row(p(0).trim,p(1),p(2),p(3),p(4)))

// Apply the schema to the RDD.
val dfCustomers = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
dfCustomers.registerTempTable("customers")

// SQL statements can be run by using the sql methods provided by sqlContext.
val custNames = sqlContext.sql("SELECT name FROM customers")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
custNames.map(t => "Name: " + t(0)).collect().foreach(println)

// SQL statements can be run by using the sql methods provided by sqlContext.
val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
customersByCity.map(t => t(0+ "," + t(1)).collect().foreach(println)

Vous pouvez aussi charger les données d’autres sources comme des fichiers JSON, des tables Hive ou même des bases de données avec JDBC. Comme vous pouvez le voir, Spark SQL fournit une bonne interface pour interagir avec des données chargées de différentes sources en utilisant la syntaxe familière du langage SQL, ce qui est particulièrement utile pour les membres des projets non développeurs, comme les analystes ou les DBA.

Conclusion

Dans cet article, nous avons vu comment Apache Spark SQL fournit une interface SQL permettant d’interagir avec les données Spark en utilisant la syntaxe familière du SQL. Spark SQL est une librairie puissante que les non-techniciens, les analystes métier ou les analystes données peuvent utiliser pour exécuter des analyses de données.

Dans le prochain article, nous examinerons la librairie Spark Streaming qui peut être utilisée pour traiter des données temps-réel ou en flux. Cette librairie s’applique à une autre part importante du processus de traitement et de gestion des données de l’entreprise. En effet, puisque ces données offrent une vision temps-réel de ce qu’il se passe dans les systèmes et les processus, elle nous permet d’adresser des cas d’utilisation critiques pour l’entreprise comme la détection de fraude, le commerce en ligne, les traitements d’événements,... etc.

Références

Au sujet de l’Auteur

Srini Penchikala travaille actuellement en tant qu’architecte applicatif au sein d’une société de services financiers à Austin au Texas. Il a plus de 20 ans d’expérience en architecture, conception et développement. Srini écrit actuellement un livre sur les patterns dans le contexte des bases NoSQL. Il est aussi le co-auteur de "Spring Roo in Action", aux éditions Manning. Il a été présentateur lors de plusieurs conférences, comme JavaOne, SEI Architecture Technology Conference (SATURN), IT Architect Conference (ITARC), No Fluff Just Stuff, NoSQL Now et Project World Conference. Srini a publié également de nombreux articles sur l’architecture, la sécurité et la gestion des risques et les bases NoSQL sur divers sites, tels qu’InfoQ, The ServerSide, OReilly Network (ONJava), DevX Java, java.net et JavaWorld. Il est Lead Editor de la communauté bases de données NoSQL chez InfoQ.

Evaluer cet article

Pertinence
Style

Contenu Éducatif

BT