BT

Disseminando conhecimento e inovação em desenvolvimento de software corporativo.

Contribuir

Tópicos

Escolha a região

Início Artigos Apache Arrow e Java: Transferência de Big Data na velocidade da luz

Apache Arrow e Java: Transferência de Big Data na velocidade da luz

Favoritos

Pontos Principais

  • O recurso cópia zero de transferência de dados do Arrow para aplicações analíticas.

  • O Arrow possibilita o processamento in-memory em formatação colunar.

  • O Arrow é multi-plataforma, multi-linguagem e interoperável na troca de dados.

  • O Arrow é a estrutura principal para sistemas de Big Data.

Por sua própria natureza o Big Data é muito grande para caber em uma única máquina. O conjunto de dados precisa ser particionado entre várias máquinas. Cada partição é atribuída a uma máquina primária com tarefas de backup opcional. Consequentemente toda máquina tem múltiplas partições. A maioria dos frameworks de Big Data usam uma estratégia aleatória para atribuir partições às máquinas. Se cada tarefa de processamento usa uma partição, essa estratégia resulta na pulverização da carga de processamento no cluster. No entanto, se uma tarefa precisa de múltiplas partições, há uma grande chance de precisar buscar partições de outras máquinas e a transferência de dados será sempre uma penalidade para a performance.

O Apache Arrow apresenta multi-linguagem, multiplataforma e uma formatação de dados colunar in-memory. Ele elimina a necessidade de serialização, pois os dados são representados pelos mesmos bytes em cada plataforma e linguagem de programação. Essa formatação habilita a transferência de dados cópia-zero em sistemas Big Data, minimizando a queda de performance devido à transferência de dados.

O objetivo deste artigo é apresentar o Apache Arrow e deixar você familiarizado com os conceitos básicos da biblioteca Java do Apache Arrow. O código fonte que acompanha este artigo pode ser encontrado aqui.

Tipicamente, uma transferência de dados consiste de:

  • serialização de dados em formatação única
  • envio de dados serializados através de uma conexão de internet
  • deserialização de dados no ambiente de destino

Pense por exemplo na comunicação entre frontend e backend numa aplicação web. Comumente, a formatação JavaScript Object Notation (JSON) é usado para serializar dados. Para pequenas quantidades é ótimo. O custo adicional de serialização e deserialização é insignificante e o JSON utiliza texto legível simplificando a depuração. No entanto, quando volume de dados aumenta, o custo da serialização pode se tornar um fator impactante no desempenho. Sem o devido cuidado, sistemas podem acabar com a maior parte do tempo serializando dados. Obviamente, existem coisas mais úteis para fazer com os ciclos da sua CPU.

Nesse processo, há um fator que nós controlamos no software: (de)serialização. Não é preciso dizer que existe uma infinidade de frameworks de serialização disponíveis como ProtoBuf, Thrift, MessagePack, e muitos outros. Muitos deles têm como objetivo principal minimizar as penalizações da serialização.

Apesar dos esforços para minimizar a serialização, há um passo inevitável na (de)serialização. Os objetos em que seu código atua não são os bytes que são enviados pela rede. Os bytes que são recebidos pelo cabo, não são os objetos em que o código atua do outro lado. No fim, a serialização rápida não é serialização.

O Apache Arrow serve para mim?

Conceitualmente, o Apache Arrow foi desenhado como uma estrutura principal para sistemas Big Data, por exemplo, no Ballista ou no Dremio, ou para integrações de sistema Big Data. Se o seu uso não é na área de sistemas Big Data, então provavelmente a exigência de recursos adicionais do Apache Arrow não vai valer a pena. Será melhor uma serialização de framework que é amplamente adotada como o ProtoBuf, o FlatBuffers, o Thrift, o MessagePack ou outros.

Escrever código com o Apache Arrow é muito diferente de escrever código com os objetos planos do Java, no sentido de que não haverão mais os objetos do Java. O código opera nos buffers até o fim. Bibliotecas existentes como por exemplo: Apache Commons, Guava, etc., não servem mais. Você poderá ter de re-implementar alguns algoritmos para trabalharem com byte buffers. E não menos importante, você deverá sempre de pensar em termos de colunas ao invés de objetos.

Construir um sistema sobre o Apache Arrow exige ler, escrever, respirar e suar buffers de Arrow. Se você está construindo um sistema que trabalha com coleções de objetos de dados (por exemplo: algum banco de dados) , e quer processar coisas compatíveis com colunas e está planejando rodar isso num cluster, então o Arrow definitivamente vale o investimento.

A integração com o Parquet (discutida adiante) torna a persistência relativamente fácil. A multiplataforma e o aspecto de multi linguagem suporta arquiteturas de microservices poliglotas e permite uma fácil integração com o cenário Big Data. O framework RPC integrado chamado Arrow Flight facilita compartilhar ou entregar conjuntos de dados de forma padronizada e eficiente.

Transferência de dados cópia-zero

Por que precisamos da serialização? Numa aplicação em Java normalmente você trabalha com objetos e valores primitivos. Esse objetos são de alguma forma mapeados para bytes na memória RAM do seu computador. O JDK entende como esse mapeamento é feito no seu computador mas pode ser diferente em outra máquina. Pense por exemplo na ordenação de bytes (também conhecida como extremidade). Além disso, nem todas as linguagens de programação tem o mesmo conjunto de tipos de dados primitivos, nem armazenam tipos similares do mesmo jeito.

A serialização converte a memória usada por objetos numa formatação comum. Essa formatação tem uma especificação e para cada linguagem de programação e plataforma uma biblioteca é disponibilizada convertendo objetos para a forma serializada e vice e versa. Em outras palavras serialização é compartilhar dados sem romper com as particularidades de cada linguagem de programação e plataforma. A serialização suaviza todas as diferenças entre plataformas e linguagens de programação, assim como tradutores suavizam as barreiras da língua entre pessoas que falam línguas diferentes, permitindo às pessoas que programam trabalharem do jeito que gostam.

A Serialização é bastante útil na maioria das situações. No entanto, quando estamos transferindo grandes quantidades de dados torna-se um imenso gargalo. Consequentemente, será que podemos eliminar o processo de serialização nesses casos? Na realidade esse é o objetivo dos frameworks de serialização cópia zero, como o Apache Arrow e o FlatBuffers. Podemos pensar nisso como trabalhar no próprio dado serializado ao invés de trabalhar nos objetos para evitar o passo da serialização. A cópia zero se refere ao fato de que os bytes da aplicação podem ser transferidos através do cabo sem qualquer modificação. Assim como, no recebimento a aplicação poderá trabalhar nos bytes como são, sem o passo da desserialização.

A grande vantagem aqui é que o dado pode ser transferido como ele é de um ambiente para o outro sem tradução alguma por que o dado é entendido nos dois lados da conexão.

A grande desvantagem é perder as particularidades na programação. Todas as operações são executadas em buffers de bytes. Não existem inteiros ou matrizes (array), existem sequências de bytes. Não há objetos, mas uma coleção de sequências de bytes. Naturalmente, ainda podemos converter o tipo de dado para inteiro, matriz ou objeto, mas estaremos fazendo uma serialização e acabando com o propósito da cópia zero. Uma vez transferido para objetos do Java apenas o Java poderá trabalhar com o dado.

Como funciona na prática? Vamos dar uma rápida olhada em dois frameworks de serialização cópia zero: o Apache Arrow e o FlatBuffers da Google. Apesar de ambos serem frameworks cópia zero, eles se apresentam de forma diferente em diferentes casos de uso.

O FlatBuffers foi inicialmente desenvolvido para jogos mobile. O foco é na rápida transmissão de dados do servidor para o cliente, com mínima carga adicional. Podendo enviar um único objeto ou uma coleção de objetos. Os dados são armazenados (empilhados) em Buffers de Bytes, formatados nos FlatBuffers com layout padrão de dados. Os compiladores FlatBuffers vão gerar códigos baseados na especificação dos dados simplificando a interação com os Buffers de bytes, permitindo trabalhar os dados como se fossem uma matriz, um objeto ou um tipo primitivo. Nos bastidores, cada método acessor busca os bytes correspondentes e os converte em formatos compreensíveis ao JVM e ao código. E se por qualquer razão for necessário, ainda é possível acessar os bytes.

O Arrow e o FlatBuffers são diferentes na maneira de organizar listas, matrizes, e tabelas na memória. Enquanto o FlatBuffers usa uma formatação de orientação por linha para as tabelas, o Arrow usa uma formatação colunar para armazenar dados tabulares. Isso faz toda a diferença para as queries analíticas (OLAP) utilizadas no big data.

O Arrow é auxiliado por sistemas de big data nos quais normalmente não ocorre a transferência de um único objeto, mas de grandes coleções de objetos. O FlatBuffers por outro lado, é vendido e usado como um framework de serialização. Em outras palavras, o código que trabalha em objetos e tipos primitivos do Java só é transformado em dados no modelo de memória do FlatBuffers no envio dos dados. Se o outro lado do cabo é apenas leitura (read-only), não será necessário desserializar os dados em objetos do Java, o dado poderá se lido diretamente dos buffers de byte do FlatBuffers.

Em um grande conjunto de dados, o número de linhas pode variar normalmente de milhares à trilhões de linhas. Assim como variar de algumas à milhares de colunas.

Uma consulta analítica típica referência um determinado grupo de colunas. Imagine por exemplo um conjunto de dados de transações de e-commerce e que o gerente queira uma visualização das vendas, de uma determinada região agrupadas por categorias de item. Ele não vai querer ver cada venda individualmente, o preço de venda padrão será suficiente. Essa consulta pode ser respondida em três passos:

  • Cruzando todos os valores com a coluna "região", localizando todos os IDs das linhas/objetos de vendas da região requisitada.
  • Agrupando os IDs filtrados baseado nos valores correspondentes de item da coluna categoria.
  • Processando as agregações para cada grupo

Essencialmente, um processador de consultas precisa ter somente uma coluna na memória o tempo todo. Mas armazenando uma coleção com uma formatação colunar, é possível acessar todos os valores de um único campo/coluna separadamente. Em formatos bem projetados isso é feito de tal forma que o layout é otimizado por instruções SIMD das CPUs. Para essas cargas de trabalho análiticas, o layout colunar do Apache Arrow funciona melhor que o layout orientado por linhas do FlatBuffers.

Apache Arrow

O núcleo do Apache Arrow é o layout de formatação de dados in-memory. Em termos de formatação, o Apache Arrow oferece um conjunto de bibliotecas (incluindo C, C++, C#, Go, Java, Java Script, MATLAB, Python, R, Ruby e Rust) para trabalhar com dados no Apache Arrow. O restante desse artigo trata de familiarizar os conceitos básicos do Arrow, e como escrever uma aplicação em Java usando Apache Arrow.

Conceitos Básicos

Vetor Schema Raiz

Vamos imaginar a modelagem do registro de vendas de uma cadeia de lojas. Geralmente definimos um objeto para representar uma venda e esse objeto terá propriedades como:

  • um id;
  • informações sobre a loja na qual a venda foi feita, como região, cidade e o tipo de loja;
  • algumas informações do cliente;
  • um id do produto vendido;
  • uma categoria (e se possível a subcategoria) do produto vendido;
  • quantos produtos foram vendidos;
  • outros dados;

Em Java, uma venda é modelada por uma classe Sale. A classe contém todas as informações de uma única venda. Então todas as vendas são representadas (in-memory) por uma coleção objetos da classe Sale. Da perspectiva do banco de dados uma coleção de objetos Sale é equivalente a um banco de dados relacional orientado por linhas. De fato, geralmente nesse tipo de aplicação a coleção de objetos é mapeada para uma tabela relacional num banco de dados para persistência.

Em um banco de dados orientado por colunas, a coleção de objetos é decomposta numa coleção de colunas. Todos os IDs estão armazenados numa coluna única. Na memória, todos os IDs estão armazenados sequencialmente e de forma semelhante existe uma coluna para o armazenamento de todas as lojas-cidades para cada venda. Conceitualmente essa formatação colunar pode ser vista como a decomposição da coleção de objetos dentro de um conjunto de matrizes (arrays) de igual comprimento, sendo uma matriz por campo de um objeto.

Para reconstruir um objeto específico são combinados valores escolhidos de cada coluna/matriz num dado índice das matrizes decompostas. Por exemplo, a 10ª venda é recomposta pegando o 10º valor de ID da matriz, o 10º valor da matriz loja-cidade, e assim por diante.

O Apache Arrow funciona como um banco de dados relacional orientado por colunas. Uma coleção de objetos do Java é decomposta numa coleção de colunas que são chamados de vetores no Arrow. O vetor é a unidade básica da formatação colunar do Arrow.

A mãe de todos os vetores é o vetor campo FieldVector. Existem vetores para tipo primitivo, como o Int4Vector e o Flaoat8Vector. Há um tipo de vetor para Strings: o VarCharVector. Há um tipo de vetor para dados binários arbitrários: o VarBinaryVector. Existem vários tipos de vetores para o modelo tempo, como o TimeStampVector, o TimeStampSecVector, o TimeStampTZVector e o TimeMicroVector.

Estruturas mais complexas podem ser criadas. Um StructVector é usado para agrupar um conjunto de vetores dentro de um campo. Pense por exemplo na informação de loja no exemplo de vendas acima. Toda a informação (região, cidade e tipo) pode ser agrupado num único StructVector. O ListVector permite o armazenamento de lista de elementos com comprimento variável num único campo. Um MapVector armazena um valor chave mapeando um único vetor.

Continuando a analogia de banco de dados, uma coleção de objetos é representada por uma tabela. Para identificar valores na tabela, uma tabela tem um schema: um nome para mapeamento de tipo. Em um banco de dados orientado por linha, cada linha mapeia um nome para um valor de tipo predefinido. Em Java, um schema corresponde ao conjunto de variáveis de uma definição de classe. Um banco de dados orientado por coluna também tem um schema. Na tabela, cada nome no schema mapeia uma coluna de tipo predefinido.

Na terminologia Apache Arrow, uma coleção de vetores é representada por um VectorSchemaRoot. Um VectorSchemaRoot também contém um Schema, mapeando nomes (Fields) para colunas (Vectors). Alocador de Buffer

Onde são armazenados os valores adicionados a um vetor? Um vetor Arrow é suportado por um buffer, normalmente um java.nio.ByteBuffer. Buffers são acumulados num alocador de buffer. Podemos solicitar para o alocador de buffer criar um buffer de um determinado tamanho ou deixar que o alocador de buffer cuide da criação e expansão automática para armazenar novos valores.

Um vetor é gerenciado por um alocador, por isso dizemos que o alocador é proprietário do buffer que suporta o vetor. A posse do vetor pode ser transferida de um alocador para outro.

Por exemplo, na implementação de um fluxo de dados. O fluxo consiste numa sequência de fases de processamento. Em cada fase algumas operações são executadas nos dados antes de serem passados para a próxima fase. Cada fase terá seu próprio alocador de buffer gerenciando os buffers que estão sendo processados. Uma vez que o processamento está completo os dados são entregues para a próxima fase.

Em outras palavras, a posse dos buffers que suportam os vetores é transferida para o alocador de buffer da próxima fase. Nesse momento o alocador de buffer é responsável por gerenciar e liberar a memória quando não for mais necessária.

Os buffers criados por um alocador são DirectByteBuffers, consequentemente são armazenados off-heap. Isso implica que quando acabamos de utilizar o dado, a memória deverá ser liberada. Isso pode parecer estranho para um programador Java, mas é essencial para trabalhar com o Apache Arrow. Os vetores implementam a interface AutoCloseable, por isso é recomendado incluir a criação do vetor num bloco try-with-resources que irá automaticamente fechar o vetor, ou seja, liberar a memória.

Exemplo: gravando, lendo e processando

Para concluir essa introdução, vamos utilizar um exemplo de aplicação usando o Apache Arrow. A ideia é ler um banco de dados de pessoas de um arquivo de disco, filtrar, agrupar os dados, e imprimir os resultados.

Observem que o Apache Arrow possui uma formatação in-memory. Numa aplicação real, será melhor utilizar outras formatações (colunares) que possuam armazenamento por persistência otimizado, como por exemplo, o Parquet que adiciona compactação e resumos intermediários aos arquivos gravados em disco. Como resultado a leitura e a gravação de arquivos do Parquet deverão ser mais rápidas que no Apache Arrow. O uso do Arrow neste exemplo é puramente por motivos didáticos.

Vamos imaginar uma classe Person e uma classe Address (somente exibindo partes relevantes):

public Person(String firstName, String lastName, int age, Address address) {
    this.firstName = firstName;
    this.lastName = lastName;
    this.age = age;

    this.address = address;
}

public Address(String street, int streetNumber, String city, int postalCode) {
    this.street = street;
    this.streetNumber = streetNumber;
    this.city = city;
    this.postalCode = postalCode;
}

Vamos escrever duas aplicações. A primeira aplicação irá gerar uma coleção de pessoas geradas aleatoriamente e gravá-la em disco na formatação Arrow. Depois, vamos escrever uma aplicação que lê o "banco de dados people" na formatação Arrow do disco para a memória. Selecione todas as pessoas

  • tendo um sobrenome começando com "P";
  • com idade entre 18 e 35;
  • que moram numa rua com o nome terminando em "way";

No caso das pessoas selecionadas, vamos processar a idade padrão, agrupando por cidade. Esse exemplo dará uma perspectiva de como usar o Apache Arrow para implementar análise de dados in-memory.

O código deste exemplo está disponível no repositório Git.

Gravando dados

Antes de começarmos a gravar os dados, observe que a formatação do Arrow é destinada para dados in-memory e não são otimizados para armazenamento em disco. Numa aplicação real devemos escolher formatações como a do Parquet, que suporta compactação e outros truques para acelerar o armazenamento colunar dos dados em disco. Aqui iremos gravar os dados na formatação Arrow para manter o foco na discussão.

Dada uma matriz de objetos Person, vamos começar a gravar os dados em um arquivo chamado people.arrow. O primeiro passo é converter a matriz Person de objetos para um VectorSchemaRoot. Se desejamos obter o máximo do Arrow, será necessário escrever toda a aplicação para usar vetores Arrow, mas por motivos didáticos será útil fazer essa conversão aqui.

private void vectorizePerson(int index, Person person, VectorSchemaRoot schemaRoot) {
    // Using setSafe: it increases the buffer capacity if needed
    ((VarCharVector) schemaRoot.getVector("firstName")).setSafe(index, person.getFirstName().getBytes());
    ((VarCharVector) schemaRoot.getVector("lastName")).setSafe(index, person.getLastName().getBytes());
    ((UInt4Vector) schemaRoot.getVector("age")).setSafe(index, person.getAge());

    List<FieldVector> childrenFromFields = schemaRoot.getVector("address").getChildrenFromFields();

    Address address = person.getAddress();
    ((VarCharVector) childrenFromFields.get(0)).setSafe(index, address.getStreet().getBytes());
    ((UInt4Vector) childrenFromFields.get(1)).setSafe(index, address.getStreetNumber());
    ((VarCharVector) childrenFromFields.get(2)).setSafe(index, address.getCity().getBytes());
    ((UInt4Vector) childrenFromFields.get(3)).setSafe(index, address.getPostalCode());
}

Em vectorizePerson, um objeto Person é mapeado para os vetores no schemaRoot com o schema de pessoas. O método setSafe garante que o buffer de suporte seja grande o suficiente para conter o próximo valor. Se o buffer de suporte não é grande o suficiente, ele será estendido.

Um VectorSchemaRootv é um container para um schema e uma coleção de vetores. Assim como a classe VectorSchemaRoot pode ser pensada como uma base de dados sem um schema, o schema só é conhecido quando é passado ao construtor, no instanciamento do objeto. Por isso todos os métodos, como por exemplo: getVector, têm tipos de retorno bem genéricos, FieldVector no caso. Como resultado muitas conversões são necessárias baseadas no schema ou no conhecimento do conjunto de dados.

Neste exemplo podemos optar por pré-alocar o UInt4VectorsUInt2Vector (pois sabemos com antecedência quantas pessoas existem em um lote). Então poderíamos usar o método set para evitar a checagem do tamanho do buffer e a re-alocação para a expansão do buffer.

A função vectorizePerson pode ser passada para um ChunkedWriter, que é uma abstração que manipula o particionamento e a gravação num arquivo com uma formatação binária do Arrow.
 

void writeToArrowFile(Person[] people) throws IOException {
   new ChunkedWriter<>(CHUNK_SIZE, this::vectorizePerson).write(new File("people.arrow"), people);
}

The ChunkedWriter has a write method that looks like this:
public void write(File file, Person[] values) throws IOException {
   DictionaryProvider.MapDictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider();

   try (RootAllocator allocator = new RootAllocator();
        VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(personSchema(), allocator);
        FileOutputStream fd = new FileOutputStream(file);
        ArrowFileWriter fileWriter = new ArrowFileWriter(schemaRoot, dictProvider, fd.getChannel())) {
       fileWriter.start();

       int index = 0;
       while (index < values.length) {
           schemaRoot.allocateNew();
           int chunkIndex = 0;
           while (chunkIndex < chunkSize && index + chunkIndex < values.length) {
               vectorizer.vectorize(values[index + chunkIndex], chunkIndex, schemaRoot);
               chunkIndex++;
           }
           schemaRoot.setRowCount(chunkIndex);
           fileWriter.writeBatch();

           index += chunkIndex;
           schemaRoot.clear();
       }
       fileWriter.end();
   }
}

 

Vamos simplificar. Primeiro, criamos um (i) alocador, um (ii) schemaRoot, e um (iii) dictProvider. Precisamos deles para (i) alocar os buffers de memória, (ii) ser um container para os vetores (suportados por buffers), e (iii) facilitar a compressão do dicionário (podemos ignorar esse ponto no momento).

A seguir, em (2) um ArrowFileWriter é criado. Ele manipula a gravação em disco, com base em um VectorSchemaRoot. Escrever um conjunto de dados em lotes é muito fácil dessa maneira. Não menos importante, não vamos esquecer de iniciar o escritor.

O resto do método trata da vetorização da matriz Person, em partes, no vector schema root e da gravação por lote.

Qual o benefício da gravação em lotes? Em algum momento, o dado é lido do disco. Se o dado é gravado em um lote, será totalmente lido e armazenado na memória principal. A gravação em lotes, permite ao leitor processar os dados em pequenos pedaços, limitando assim o footprint de memória.

Nunca devemos esquecer de definir a contagem de valores de um vetor ou a contagem de linhas de um vetor schema root (que indiretamente define contagens de valores de todos os vetores contidos). Sem definir a contagem, um vetor aparecerá vazio, mesmo depois de valores terem sido armazenados nesse vetor.

Finalmente, quando todos os dados são armazenado em vetores, o fileWriter.writeBatch() os envia para o disco.

Uma observação sobre o gerenciamento de memória

Observe o schemaRoot.clear() e o allocator.close() nas linhas (3) e (4). O primeiro apaga todos os dados em todos os vetores contidos no VectorSchemaRoot e redefine a linha e as contagens de valor para zero. O segundo fecha o alocador e se esquecermos de liberar qualquer buffer alocado seremos informados de que existe um vazamento de memória.

Nessa configuração, o fechamento é um pouco supérfluo, pois o programa finaliza logo após o fechamento do alocador. No entanto, no mundo real, numa aplicação de vida longa o gerenciamento de memória é crítico.

As preocupações com gerenciamento de memória parecerão estranhas para programadores Java. Mas nesse caso é o preço a ser pago pela performance. Ser bastante consciente sobre buffers alocados e a liberação deles no fim do ciclo de vida.

Leitura de dados

A leitura de dados de um arquivo formatado Arrow é similar à gravação. Deve se definir um alocador, um vetor schema raiz (sem schema, ele é parte do arquivo), abra um arquivo e deixe o ArrowFileReader tomar conta do resto. Não esqueça de inicializar, pois isso é lido no Schema do arquivo.

Para ler o lote, chame o fileReader.loadNextBatch(). O próximo lote, se ainda houver um disponível, é lido do disco e os buffers de vetores no schemaRoot são preenchidos com os dados, prontos para serem processados.

O próximo fragmento de código descreve brevemente como ler um arquivo Arrow. Para cada execução do loop, um lote será carregado para dentro do VectorSchemaRoot.

O conteúdo do lote é descrito pelo VectorSchemaRoot: (i) o schema do VectorSchemaRoote (ii) o valor contado, igual ao número de entradas.

try (FileInputStream fd = new FileInputStream("people.arrow");
    ArrowFileReader fileReader = new ArrowFileReader(new SeekableReadChannel(fd.getChannel()), allocator)) {
   // Setup file reader
   fileReader.initialize();
   VectorSchemaRoot schemaRoot = fileReader.getVectorSchemaRoot();

   // Aggregate: Using ByteString as it is faster than creating a String from a byte[]
   while (fileReader.loadNextBatch()) {
      // Processing … 
   }
}

Processando dados

Não menos importante, os passos de filtração, agrupamento e agregação devem dar uma amostra de como é trabalhar com vetores do Arrow num software de análise de dados. Definitivamente essa não é a melhor maneira de trabalhar com os Vetores do Arrow, mas é uma forma bastante sólida de iniciar a exploração do Apache Arrow. Observem o código fonte do motor de processamento Gandiva para o mundo real. O processamento de dados do Apache Arrow é um assunto bastante extenso e podemos literalmente escrever um livro sobre isso.

Observe que o código do exemplo é bastante específico para o caso de Person. Ao elaborar um processador de consultas com vetores Apache, os nomes e tipos dos vetores não são conhecidos com antecedência levando a um código mais genérico e mais difícil de entender.

Devido à formatação colunar do Arrow, podem ser aplicados passos para a filtração independente usando apenas uma coluna.

private IntArrayList filterOnAge(VectorSchemaRoot schemaRoot) {
    UInt4Vector age = (UInt4Vector) schemaRoot.getVector("age");
    IntArrayList ageSelectedIndexes = new IntArrayList();
    for (int i = 0; i < schemaRoot.getRowCount(); i++) {
        int currentAge = age.get(i);
        if (18 <= currentAge && currentAge <= 35) {
            ageSelectedIndexes.add(i);
        }
    }
    ageSelectedIndexes.trim();
    return ageSelectedIndexes;
}

Esse método coleta todos os índices nas partes ocupadas do vetor idade para os quais o valor está entre 18 e 35.

Cada filtro produz uma lista organizada com esse índices. No próximo passo, essas listas serão cruzadas/mescladas numa única lista de índices selecionados. Essa lista contém os índices de linhas que atendem a todos os critérios.

O próximo trecho de código mostra como podemos preencher facilmente estruturas de agregação de dados (mapeando cidade city para uma contagem e uma soma) de vetores da coleção de IDs selecionados.

VarCharVector cityVector = (VarCharVector) ((StructVector) schemaRoot.getVector("address")).getChild("city");
UInt4Vector ageDataVector = (UInt4Vector) schemaRoot.getVector("age");

for (int selectedIndex : selectedIndexes) {
   String city = new String(cityVector.get(selectedIndex));
   perCityCount.put(city, perCityCount.getOrDefault(city, 0L) + 1);
   perCitySum.put(city, perCitySum.getOrDefault(city, 0L) + ageDataVector.get(selectedIndex));
}

Após o preenchimento da estrutura de agregação, a impressão da idade padrão por cidade é bem fácil:

for (String city : perCityCount.keySet()) {
    double average = (double) perCitySum.get(city) / perCityCount.get(city);
    LOGGER.info("City = {}; Average = {}", city, average);
}

Conclusão

Esse artigo apresentou o Apache Arrow que possui um layout de dados com uma formatação colunar, in-memory e multilinguagem. É um bloco de construção para sistemas Big Data, focando na transferência de dados eficiente entre máquinas em um cluster e entre sistemas de Big Data diferentes. Para começar o desenvolvimento de aplicações em Java usando Apache Arrow, são usados dois exemplos de aplicações que gravam e lêem dados na formatação Arrow. Também tivemos uma primeira impressão do processamento de dados usando a biblioteca Java do Apache Arrow.

O Apache Arrow possui formatação colunar. Um layout orientado por coluna é geralmente mais adequado para cargas analíticas que layouts orientados por linhas. No entanto, sempre existem vantagens e desvantagens e para uma carga específica uma formatação orientada por linhas pode apresentar melhores resultados.

O VectorSchemaRoots, os buffers e o gerenciamento de memória não serão similares ao seu código Java nativo. Se você conseguir toda a performance necessária de um framework diferente, por exemplo: FlatBuffers, essa forma menos nativa de trabalhar poderá ser decisiva para a adoção do Apache Arrow na sua aplicação.

Sobre o Autor

Joris Gillis é desenvolvedor pesquisador na TrendMiner. A TrendMiner cria softwares analíticos self-service para séries temporais de dados em IIoT. Como desenvolvedor pesquisador ele trabalha com algoritmos de análise de escalonamento, bancos de dados de séries temporais e conectividade para fontes externas de séries de dados temporais.

Avalie esse artigo

Relevância
Estilo/Redação

Conteúdo educacional

HTML é permitido: a,b,br,blockquote,i,li,pre,u,ul,p

HTML é permitido: a,b,br,blockquote,i,li,pre,u,ul,p

BT