BT

Disseminando conhecimento e inovação em desenvolvimento de software corporativo.

Contribuir

Tópicos

Escolha a região

Início Artigos Big Data com Apache Spark Parte 2: Spark SQL

Big Data com Apache Spark Parte 2: Spark SQL

No artigo anterior desta série sobre Apache Spark, aprendemos o que é o framework Apache Spark e como ele pode ser útil no processamento de dados analíticos de Big Data necessário para empresas.

O Spark SQL, é parte integrante do framework de Big Data Apache Spark, utilizado para processamento de dados estruturados, que permite executar consultas SQL no conjunto de dados do Spark. É possível realizar tarefas ETL sobre os dados em diferentes formatos, como por exemplo JSON, Parquet, banco de dados e com isto, executar consultas ad-hoc.

Nesta segunda parte da série, vamos abordar a biblioteca Spark SQL, e como ela pode ser utilizada para executar consultas SQL sobre dados armazenados em arquivos batch, sobre conjunto de dados JSON ou em tabelas do HIVE.

A última versão do Spark é a 1.6.1 lançada em março de 2016. Até antes da versão 1.3, o módulo Spark SQL ainda encontrava-se com status de "Alpha" mas a partir da versão 1.3, o time de desenvolvimento do framework removeu esta etiqueta da biblioteca. Entre outras funcionalidades existentes, alguns destaques incluem:

  • DatFrame: Desde a versão 1.3, as novas versões do Spark fornecem uma abstração de programação nomeada de DataFrames, o qual pode agir como um motor de consultas SQL distribuído. DataFrames podem fornecer funções de alto nível, permitindo que o Spark compreenda melhor a estrutura dos dados, assim como o cálculo a ser executado. Esta informação adicional permite que o otimizador Catalyst e o mecanismo de execução Tungsten acelerem automaticamente as análises de Big Data.
  • API Data Source: Com a inclusão de uma API para fontes de informação - Data sources API - a biblioteca Spark SQL permite que a computação de informações sobre dados armazenados de forma estruturada seja facilitada e mais abrangente, incluindo formatos como: Parquet, JSON e a biblioteca Avro da Apache Fundation.
  • Servidor JDBC interno: Agora, com um servidor JDBC interno, a conexão com banco de dados relacionais para consultar dados estruturados em tabelas e realizar análises de Big Data pode ser feita com ferramentas de BI - Business Inteligence - tradicionais.
  • Funcionalidades para Data Science: Recursos para aplicações de machine learning aumentaram o pipeline de recursos do Spark ML para a construção de pipelines de aprendizado. No passado, para um aplicativo armazenar um pipeline externamente, era necessário implementar código personalizado. No Spark 1.6, a API de pipeline oferece funcionalidades para salvar e recarregar pipelines a partir de um estado anterior e aplicar modelos construídos anteriormente com novos dados. Por exemplo, os usuários podem treinar um pipeline em uma tarefa durante a noite e, em seguida, aplicá-lo aos dados de produção.
  • Novos algoritmos para Machine Learning: A versão 1.6 também aumenta a cobertura de algoritmos para aprendizado de máquina, incluindo entre outros:

Componentes do Spark SQL

Os dois principais componentes ao se utilizar o Spark SQL são o DataFrame e o SQLContext.

Vamos detalhar primeiramente o DataFrame.

DataFrame

Um DataFrame é uma coleção de dados distribuídos e organizados em forma de colunas nomeadas. É baseado no conceito de estrutura de dados da linguagem R, similar a uma tabela de um banco de dados relacional.

Antes da versão 1.3, o componente DataFrame era chamado de SchemaRDD. DataFrames podem ser transformados em RDDs por meio de uma chamada de método RDD, que retorna o conteúdo de um DataFrame como um conjunto de linhas RDD.

DataFrames podem ser criados a partir de diferentes fontes de informações, como por exemplo:

  • RDDs já existentes
  • Arquivos de dados estruturados
  • conjunto de dados JSON
  • Tabelas Hive
  • Banco de dados externos

O Spark SQL e a API de DataFrame estão disponíveis nas seguintes linguagens de programação:

  • Scala (https://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.sql.package
  • Java (https://spark.apache.org/docs/1.3.0/api/java/index.html?org/apache/spark/sql/api/java/package-summary.html)
  • Python (https://spark.apache.org/docs/1.3.0/api/python/pyspark.sql.html)

Os códigos Spark SQL de exemplo utilizados neste artigo utilizam o programa Spark Scala Shell.

SQLContext

O Spark SQL fornece o componente SQLContext para encapsular todas as funcionalidades relacionais no Spark. É possível criar os SQLContext a partir do SparkContext existente e que foi utilizado nos exemplos anteriores. O trecho de código abaixo mostra como criar um objeto SQLContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Existe também um componente HiveContext o qual fornece um conjunto maior de funcionalidades para o SQLContext. Este componente pode ser utilizado para escrever consultas utilizando o HiveQL e com isto, ler dados de tabelas Hive.

É importante destacar que não é necessário um ambiente Hive para utilizar o componente HiveContext em programas Spark.

Fontes de Dados JDBC

Outras funcionalidades na biblioteca Spark SQL incluem fontes de informação que fazem uso de JDBC como interface de integração.

Interfaces de integração JDBC podem ser utilizadas para ler informações armazenadas em banco de dados relacionais. O JDBC é um conjunto de classes e interfaces (API) escritas em Java para execução e manipulação de resultados de instruções SQL para qualquer banco de dados relacional. Para cada Banco de dados há um driver JDBC. O uso desta abordagem deve ser mandatório em relação ao uso de JdbcRDD devido ao fato de que a API retorna os resultados como um DataFrame, o qual pode ser processado no Spark SQL ou combinado com outras fontes de dados.

Uma aplicação Spark SQL de exemplo

No artigo anterior, aprendemos como instalar o Spark em uma máquina local, como iniciar e interagir com o Spark por meio do programa Spark Scala Shell. Para instalar a última versão do Spark, faça o download diretamente do website.

Para os exemplos de código deste artigo, serão utilizados os mesmos códigos do Spark Shell para execução dos códigos do Spark SQL. Observe que os exemplos de código estão preparados para o ambiente Windows.

Para certificar-se de que o programa Spark Shell tem memória suficiente para executar, use o parâmetro driver-memory como argumento de linha de comando ao executar o spark-shell, como demonstrado abaixo:

spark-shell.cmd --driver-memory 1G

A aplicação Spark SQL

Uma vez que a aplicação Spark Shell foi iniciada, podemos executar consultas de análise de dados utilizando a API Spark SQL.

No primeiro exemplo, são carregadas informações de consumidores a partir de um arquivo e então é criado um objeto DataFrame a partir do conjunto de dados. Com isto, é possível executar funções DataFrame como consultas específicas para selecionar os dados.

Abaixo, podemos observar o conteúdo do arquivo texto customers.txt.

100, John Smith, Austin, TX, 78727
200, Joe Johnson, Dallas, TX, 75201
300, Bob Jones, Houston, TX, 77028
400, Andy Davis, San Antonio, TX, 78227
500, James Williams, Austin, TX, 78727

O trecho de código a seguir apresenta os comandos Spark SQL que podem ser executados no console do Spark Shell

// Create the SQLContext first from the existing Spark Context
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Import statement to implicitly convert an RDD to a DataFrame
import sqlContext.implicits._

// Create a custom class to represent the Customer
case class Customer(customer_id: Int, name: String, city: String, state: String, zip_code: String)

// Create a DataFrame of Customer objects from the dataset text file.
val dfCustomers = sc.textFile("data/customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt, p(1), p(2), p(3), p(4))).toDF()

// Register DataFrame as a table.
dfCustomers.registerTempTable("customers")

// Display the content of DataFrame
dfCustomers.show()

// Print the DF schema
dfCustomers.printSchema()

// Select customer name column
dfCustomers.select("name").show()

// Select customer name and city columns
dfCustomers.select("name", "city").show()

// Select a customer by id
dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show()

// Count the customers by zip code
dfCustomers.groupBy("zip_code").count().show()

No exemplo acima, o esquema é inferido por meio de reflexão. É possível também especificar programaticamente o esquema do conjunto de dados. Isso é útil quando as classes personalizadas não podem ser definidas antes do tempo, devido a estrutura dos dados estar codificada em formato de uma string.

O código a seguir demonstra como especificar um esquema utilizando os novos tipos de classes de dados StructType, StringType, e StructField.

//
// Programmatically Specifying the Schema
//

// Create SQLContext from the existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val rddCustomers = sc.textFile("data/customers.txt")

// The schema is encoded in a string
val schemaString = "customer_id name city state zip_code"

// Import Spark SQL data types and Row.
import org.apache.spark.sql._

import org.apache.spark.sql.types._;

// Generate the schema based on the string of schema
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (rddCustomers) to Rows.
val rowRDD = rddCustomers.map(_.split(",")).map(p => Row(p(0).trim,p(1),p(2),p(3),p(4)))

// Apply the schema to the RDD.
val dfCustomers = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
dfCustomers.registerTempTable("customers")

// SQL statements can be run by using the sql methods provided by sqlContext.
val custNames = sqlContext.sql("SELECT name FROM customers")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
custNames.map(t => "Name: " + t(0)).collect().foreach(println)

// SQL statements can be run by using the sql methods provided by sqlContext.
val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println)

É possível carregar as informações de outras fontes de dados como por exemplo: arquivos de dados JSON, tabelas Hive ou mesmo tabelas de banco de dados relacionais usando as fontes de dados JDBC.

Como foi visto, o Spark SQL fornece uma interface SQL que permite a interação com as informações que são carregadas de várias fontes de dados por meio de consultas SQL, familiares às equipes de desenvolvimento. Este recurso é útil especialmente para integrantes da equipe sem perfil técnico como analistas de dados, por exemplo, pois permite a este tipo de perfil realizar consultas assim como DBAs.

Conclusão

Neste artigo, demonstramos como o Apache Spark SQL trabalha para fornecer uma interface SQL capaz de interagir com informações armazenadas no Spark utilizando sintaxe de consulta SQL familiar. O Spark SQL é uma biblioteca poderosa que integrantes não técnicos de equipes, como por exemplo: Analistas de Negócio e Analistas de Dados, podem utilizar para realizar análise de dados para suas empresas.

No próximo artigo da série, abordaremos a biblioteca Spark Streaming que pode ser utilizada para processar dados em tempo real ou fazer streaming de dados. Esta biblioteca é outra parte importante do ciclo global de processamento de dados e do ciclo de vida de gestão em uma organização, porque o processamento de dados de streaming nos dá os insights em tempo real sobre os sistemas. Esta funcionalidade é crítica para alguns casos de uso, por exemplo: detecção de fraude, sistemas de negociação online, soluções de processamento baseadas em eventos e etc.

Sobre o autor

Srini Penchikala atualmente trabalha como Arquiteto de Software em uma organização de serviços financeiros em Austin, Texas. Ele tem mais de 20 anos de experiência em arquitetura, design e desenvolvimento. Srini está atualmente escrevendo um livro sobre padrões de bancos de dados NoSQL e também é co-autor do livro "Spring Roo in Action" da Manning Publications. Ele palestrou em diversas conferências como: JavaOne, SEI Architecture Technology Conference (SATURN), IT Architect Conference (ITARC), No Fluff Just Stuff, NoSQL Now e Project World Conference. Srini também publicou diversos artigos sobre arquitetura de software, segurança e gerenciamento de risco, e sobre banco de dados NoSQL em sites como o InfoQ, The ServerSide, OReilly Network (ONJava), DevX Java, java.net e JavaWorld. Ele é o Editor Líder de banco de dados no NoSQL na comunidade do InfoQ.

Avalie esse artigo

Relevância
Estilo/Redação

Conteúdo educacional

BT