BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Performance Tuning Techniques of Hive Big Data Table

Performance Tuning Techniques of Hive Big Data Table

Bookmarks

Key Takeaways

  • Developers working on big data applications experience challenges when reading data from Hadoop file systems or Hive tables.
  • Consolidation job, a technique used to merge smaller files to bigger files, can help with the performance of reading Hadoop data.
  • With consolidation, the number of files is significantly reduced and query time to read the data will be faster.
  • Hive tuning parameters can also help with performance when you read Hive table data through a map-reduce job.

Hive table is one of the big data tables which relies on structural data. By default, it stores the data in a Hive warehouse. To store it at a specific location, the developer can set the location using a location tag during the table creation. Hive follows the same SQL concepts like row, columns, and schema.

Developers working on big data applications have a prevalent problem when reading Hadoop file systems data or Hive table data. The data is written in Hadoop clusters using spark streaming, Nifi streaming jobs, or any streaming or ingestion application. A large number of small data files are written in the Hadoop Cluster by the ingestion job. These files are also called part files.

These part files are written across different data nodes, and when the number of files increases in the directory, it becomes tedious and a performance bottleneck if some other app or user tries to read this data. One of the reasons is that the data is distributed across nodes. Think about your data residing in multiple distributed nodes. The more scattered it is, the job takes around “N * (Number of files)” time to read the data, where N is the number of nodes across each Name Nodes. For example, if there are 1 million files, when we run the MapReduce job, the mapper has to run for 1 million files across data nodes and this will lead to full cluster utilization leading to performance issues.

For beginners, the Hadoop cluster comes with several Name Nodes, and each Name Node will have multiple Data Nodes. Ingestion/Streaming jobs write data across multiple data nodes, and it has performance challenges while reading those data. The job which reads the data will take a considerable time for developers to figure out the problem associated with the query response time. This problem mostly occurs for clients whose data is in 100’s of millions in volume every day. For smaller datasets, this performance technique may not be necessary, but it is always good to do some additional tuning for the long run.

In this article, I’ll discuss how to tackle these problems and techniques for performance tuning to improve data access from Hive tables. Hive, similar to other big data technologies like Cassandra and Spark is a very powerful solution but requires tuning by data developers and operations teams to get optimum performance out of the queries executed against Hive data.

Let’s first look at some use cases of Hive data usage.

Use Cases

Hive data is predominantly used in the following applications:

  • Big Data Analytics, running analytics reports on transaction behavior, activity, volume, and more
  • Tracking fraudulent activity and generating reports on this activity
  • Creating dashboards based on the data
  • Auditing purposes and a store for historical data
  • Feeding data for Machine learning and building intelligence around it

Tuning Techniques

There are several ways to ingest data into Hive tables. Ingestion can be done through an Apache Spark streaming job,Nifi, or any streaming technology or application. The data which gets ingested is raw data, and it’s very important to consider all tuning factors before the ingestion process begins.

Organizing Hadoop Data

The first step is to organize the Hadoop data. We begin with ingestion/streaming jobs. First, the data needs to be partitioned. The most basic way to partition data is by day or hourly. It may even be worthwhile to have two partition—days and hours. In some cases, you can partition within a day by some country, region, or something that fits your data and use case. For example, think about a library shelf, where books are arranged based on genre, and each genre is set in a child or adult section.

Figure 1: Data Organized

So, we take this example, we write data in Hadoop directory like so:

hdfs://cluster-uri/app-path/category=children/genre=fairytale OR
hdfs://cluster-uri/app-path/category=adult/genre=thrillers

In this way, your data is more organized.

In the most common case, data is partitioned by day or hour in case of no specific use-cases

hdfs ://cluster-uri/app-path/day=20191212/hr=12

or just a day partition depending on the requirement.

hdfs://cluster-uri/app-path/day=20191212

Figure 2: Ingestion Flow into Partition folder

Hadoop Data Format

When creating a Hive table, it is good to provide table compress properties like zlib and format like orc. And while ingesting, these data will be written in these formats. If your application is writing in plain Hadoop file systems, it is advised to provide the format. Most of the ingestion frameworks like Spark or Nifi have a way to specify the format. Specifying the data format helps make the data more organized in a compressed format which saves space in the Cluster.

Consolidation Job

The Consolidation job plays a crucial role in improving the performance of the overall read of Hadoop data. There are several parts associated with the consolidation technique. By default, the files written in hdfs directories are small part files and when there are too many part files, there will be performance issues while reading the data. Consolidation isn't any particular feature of Hive—it is a technique used to merge smaller files into bigger files. Consolidation technique isn’t covered anywhere online, so this particular technique is very important especially when any batch applications read the data.

What Is the Consolidation Job?

By default, ingestion/streaming jobs writing to Hive, directories write into small part files, and in a day for high volume applications, these files will be more than 100,000+ depending on volume. The real problem comes when we try to read the data, it takes a lot of time, sometimes several hours, to eventually return the result or the job can fail. For instance, let’s assume you have a day partition directory, and you need to process around 1 million small files. For example, if run count:

#Before:
hdfs dfs -count -v /cluster-uri/app-path/day=20191212/*
Output = 1Million

Now, after running the Consolidation job, the number of files will be reduced significantly. It merges all small part files into large size files.

#After:
hdfs dfs -count -v /cluster-uri/app-path/day=20191212/*
Output = 1000

Note: cluster-uri varies organization by organization, it’s a Hadoop cluster uri to connect to your specific cluster.

How Consolidation Job Helps

Consolidation of files is essential not just for performance sake but also for cluster healthiness. As per Hadoop platform guidelines, there shouldn’t be so many files lying in the nodes. Having too many files will cause too many nodes to read and attribute to high latency. Remember, when to read Hive data, it scans across all data nodes. If you have too many files, then read time spreads accordingly. So, it is essential to merge all those small files into bigger files. Also, it is necessary to have purge routines if data isn’t needed after certain days.

How Consolidation Works

There are several ways to do the consolidation of files. It mainly depends on where you are writing the data. Below I will discuss different common use cases.

  • Writing data using Spark or Nifi to Hive tables in the daily partition folder
  • Writing data using Spark or Nifi to Hadoop file system (HDFS)

Here, in this case, huge files would be written in the daily folder. The developer needs to follow any below options.

Figure 3: Consolidation Logic

  1. Write a script to perform the consolidation. The script takes parameters like day and performs Hive select data from the same partition data and inserts overwrite in the same partition. Here, when Hive re-writes data in the same partition, it runs a map-reduce job and reduces the number of files.
  1. Sometimes, overwriting the same data in the same command may leave us with unexpected data loss if the command fails. In this case, select the data from the daily partition and write it in a temporary partition. If it is successful, then move the temporary partition data to the actual partition using the load command. This step is illustrated in Figure 3.

Between these two options, option B is better, which fits all the use-cases and is most efficient. Option B is efficient because there is no data loss if any step fails. Developers can write a control m and schedule it to run at next day around midnight when there are no active users reading data.

There is one use case where the developer need not write a Hive query. Instead, submit a spark job and select the same partition and overwrite the data, but this is recommended only when the number of files is not huge in the partition folder and spark can still read the data without over-specifying resources. This option fits for low volume use cases, and this extra step can boost the performance of reading the data.

How Does the Entire Flow Work?

Let’s take one example use-case to go over all the pieces.

Assume you own an e-commerce app, you have the process to track daily customer volume by different purchasing categories. Your app is very high volume and you need a smart data analytics set up based on customer purchasing habits and history.

From the presentation layer to the mid-tier layer, you want to publish these messages using Kafka or IBM MQ. The next piece is to have one streaming app that consumes Kafka/MQ and ingests into Hadoop Hive tables. Through Nifi or Spark, this can be achieved. Before doing this, the Hive table needs to be designed and created. During the Hive table creation, you need to decide what your partition column looks like and if any sorting is required or any compression algorithm like Snappy or Zlib is needed to be applied.

The Hive table design is a crucial aspect of determining overall performance. You must consider how data is going to be queried based on how that design has to be applied. If you want to query daily how many customers had purchased items in a particular category like Toys, Furniture, etc., it is advisable to have two partitions at most, like a day partition and one as a category partition. The streaming app should then ingest the data accordingly.

Having all the usability aspects beforehand gives you better chances of designing tables to suit your needs. So once data is ingested into this table, data should be organized into day and category partitions for the above example.

Only ingested data will be small files in Hive location, so as explained above, it becomes vital to consolidate those files.

As the next part of your process, you can set up a scheduler or use a control M to run daily the Consolidation job nightly, like around 1 AM, which will call the consolidation scripts. Those scripts will consolidate the data for you. Finally, in those Hive locations, you should see the number of files reduced.

When the real smart data analytics runs for the previous day, it will be easy to query with better performance.

Hive Parameter Settings

When you read Hive table data through a map-reduce job certain tuning parameters can be handy. These tuning parameters are already discussed by the technology. Click the link to read more about Hive tuning parameters.

Set hive.exec.parallel = true;
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
set mapred.compress.map.output = true;
set mapred.output.compress= true;
Set hive.execution.engine=tez;

To learn more about each of the properties, you can refer to the existing tutorial.

Technical Implementation

Now, let’s take one use case example and show it step by step. Here, I am considering ingesting customer events data into the Hive table. My downstream systems or team will further use this data to run analytics (such as, in a day, what items did customers purchase and from which city?). This data will be used to analyze the demographics of my product users, which will allow me to troubleshoot or expand business use cases. This data can further enable us to understand where my active customer is from and how I can do more to increase my business.

Step 1: Create a sample Hive table. Here is the code snippet:

Step 2: Set up a streaming job to ingest into the Hive table.

This streaming job can spark streaming from Kafka’s real-time data and then transform and ingest it into the Hive table.

Figure 4: Hive Data Flow

So, when live data is ingested, the data will be written in day partitions. Let’s assume today’s date is 20200101.

hdfs dfs -ls /data/customevents/day=20200101/

/data/customevents/day=20200101/part00000-djwhu28391
/data/customevents/day=20200101/part00001-gjwhu28e92
/data/customevents/day=20200101/part00002-hjwhu28342
/data/customevents/day=20200101/part00003-dewhu28392
/data/customevents/day=20200101/part00004-dfdhu24342
/data/customevents/day=20200101/part00005-djwhu28fdf
/data/customevents/day=20200101/part00006-djwffd8392
/data/customevents/day=20200101/part00007-ddfdggg292

By the End of the day, depending upon the traffic of your application, the number could be anywhere between 10K to 1M. For large-scale companies the volume will be high. Let’s assume the total number of files was 141K.

Step 3: Running the Consolidation job

On 2020-01-02, i.e., the next day, around 1 AM, we should run the Consolidation job. The sample code is uploaded in git. The file name is consolidation.sh.

Below is command to run in your edge node/box

./consolidate.sh 20200101

This script will now consolidate previous day data. After it finishes, you can rerun the count.

hdfs dfs -count -v /data/customevents/day=20200101/*
count = 800

So, before it was 141K, and after consolidation, the count is 800. So, this will give you significant performance benefits.

Link for consolidation logic code.

Statistics

Without applying any tuning technique, the query time to read Hive table data will take anywhere between 5 mins to several hours depending upon volume.

Figure 5: Statistics

After consolidation, the query time significantly reduces, and we get results faster. The number of files will be significantly reduced and the query time to read the data will decrease. Without consolidation, queries run on so many small files that spread across the name nodes and lead to an increase in response time.

References

About the Author

Sudhish Koloth is a Lead developer working with a Banking and Financial services company. He has spent over 13 years working in information technology. He worked in various technologies including full-stack, big data, automation, and android development. He also played a significant role in delivering important impactful projects during the COVID-19 pandemic. Sudhish uses his expertise to solve common problems faced by humanity and is a volunteer and provides help for non-profit applications. He is also a mentor who helps fellow professionals and colleagues with his technical expertise. Mr. Sudhish is also an active preacher and motivator of Stem education’s importance to school-age children and young college graduates. He has been recognized for his work inside and outside of his career network.

Rate this Article

Adoption
Style

BT