BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Using Apache Avro

Using Apache Avro

Avro[1] is a recent addition to Apache’s Hadoop family of projects.  Avro defines a
data format designed to support data-intensive applications, and provides support for this format in a variety of programming languages.

Avro provides functionality that is similar to the other marshalling systems such as Thrift, Protocol Buffers, etc. The main differentiators of Avro include[2]:

  • "Dynamic typing: Avro does not require that code be generated. Data is always accompanied by a schema that permits full processing of that data without code generation, static datatypes, etc. This facilitates construction of generic data-processing systems and languages.
  • Untagged data: Since the schema is present when data is read, considerably less type information need be encoded with data, resulting in smaller serialization size.
  • No manually-assigned field IDs: When a schema changes, both the old and new schema are always present when processing data, so differences may be resolved symbolically, using field names."

Due to a very high performance, small codebase and a very compact resulting data, there seems to be a lot of activity around Avro - many NoSQL implementations, including Hadoop, Casandra, etc. are integrating it to both their client APIs and storage; there are benchmarking results[3] comparing Avro with other popular serialization frameworks, but there are virtually no code examples[4] that one can use to learn how to use Avro.

In this article I will try to describe my experimenting with Avro, specifically:

  • How to componentize Avro schemas and build overall schema out of components, maintained separately in multiple files
  • Implementing inheritance in Avro
  • Implementing polymorphism in Avro
  • Backward compatibility in Avro documents.

Componentizing Apache Avro Schemas

As described in the Avro specification[5] an Avro document schema is defined as a JSON file. In the current Avro implementation, a Schema class takes a single file (or string) to build the internal schema representation. Unlike XML Schema, the current version of Avro does not support imports of subschema(s) into a schema document, which often forces developers to write very complex schema definitions[6] and significantly complicates schemas reuse. An interesting example of splitting and combining schema files is given in the code example. It is based on the fact that the Schema class provides a toString() method, which returns a JSON string representing the given schema definition. Based on this approach, I have provided a simple AvroUtils, that does this automatically:

package com.navteq.avro.common;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;

import org.apache.avro.Schema;

public class AvroUtils {

        private static Map<String, Schema> schemas = new HashMap<String, Schema>();
	
        private AvroUtils(){}
	
        public static void addSchema(String name, Schema schema){
               schemas.put(name, schema);

        }

        public static Schema getSchema(String name){
               return schemas.get(name);

        }

        public static String resolveSchema(String sc){

                String result = sc;
                for(Map.Entry<String, Schema> entry : schemas.entrySet())
                      result = replace(result, entry.getKey(),
                                        entry.getValue().toString());
                return result;

        }

        static String replace(String str, String pattern, String replace) {

                int s = 0;
                int e = 0;
                StringBuffer result = new StringBuffer();
                while ((e = str.indexOf(pattern, s)) >= 0) {
                result.append(str.substring(s, e));
                result.append(replace);
                s = e+pattern.length();

        }
        result.append(str.substring(s));
        return result.toString();

}

public static Schema parseSchema(String schemaString){

        String completeSchema = resolveSchema(schemaString);
        Schema schema = Schema.parse(completeSchema);
        String name = schema.getFullName();
        schemas.put(name, schema);
        return schema;

}

public static Schema parseSchema(InputStream in)throws IOException {

    StringBuffer out = new StringBuffer();
    byte[] b = new byte[4096];
    for (int n; (n = in.read(b)) != -1;) {
     out.append(new String(b, 0, n));
    }
    return parseSchema(out.toString());

}

public static Schema parseSchema(File file)throws IOException {

        FileInputStream fis = new FileInputStream(file);
        return parseSchema(fis);
    }
}

Listing 1 AvroUtils class

This simple implementation is based on a global (static) schemas registry, which is implemented as a map of fully qualified schema names and Schema objects, corresponding to these names. For every new schema it is parsing, the implementation looks for a fully qualified schema name, that is already stored in the registry and is doing string replacement in a given schema. After the schema string is parsed, its full name and schema name are stored in the registry.

A simple test, showing how to use this class, is presented below:

package com.navteq.avro.common;

import java.io.File;

import org.junit.Test;

public class AvroUtilsTest {

       private static final String schemaDescription =
         "{ \n" +
            " \"namespace\": \"com.navteq.avro\", \n" +
            " \"name\": \"FacebookUser\", \n" +
            " \"type\": \"record\",\n" +
            " \"fields\": [\n" +
                     " {\"name\": \"name\", \"type\": [\"string\", \"null\"] },\n" +
                     " {\"name\": \"num_likes\", \"type\": \"int\"},\n" +
                     " {\"name\": \"num_photos\", \"type\": \"int\"},\n" +
            " {\"name\": \"num_groups\", \"type\": \"int\"} ]\n" +
         "}";

       private static final String schemaDescriptionExt =
         " { \n" +
             " \"namespace\": \"com.navteq.avro\", \n" +
             " \"name\": \"FacebookSpecialUser\", \n" +
             " \"type\": \"record\",\n" +
             " \"fields\": [\n" +
                      " {\"name\": \"user\", \"type\": com.navteq.avro.FacebookUser },\n" +
                      " {\"name\": \"specialData\", \"type\": \"int\"} ]\n" +
          "}";

       @Test
       public void testParseSchema() throws Exception{

               AvroUtils.parseSchema(schemaDescription1);
               Schema extended = AvroUtils.parseSchema(schemaDescriptionExt);
               System.out.println(extended.toString(true));
       }
}

Listing 2 AvroUtils test

In this test, the fully qualified name of the first schema is com.navteq.avro.FacebookUser, so the substitution works correctly and the execution prints out the following result:

{
  "type" : "record",
  "name" : "FacebookSpecialUser",
  "namespace" : "com.navteq.avro",
  "fields" : [ {
    "name" : "user",
    "type" : {
      "type" : "record",
      "name" : "FacebookUser",
      "fields" : [ {
        "name" : "name",
        "type" : [ "string", "null" ]
      }, {
        "name" : "num_likes",
        "type" : "int"
      }, {
        "name" : "num_photos",
        "type" : "int"
      }, {
        "name" : "num_groups",
        "type" : "int"
      } ]
    }
  }, {
    "name" : "specialData",
    "type" : "int"
  } ]
}

 

Listing 3 Result of AvroUtilsTest execution

Implementing Inheritance using Apache Avro

A common approach to defining data is through inheritance – taking an existing data definition and adding parameters. Although technically Avro does not support inheritance[7], it is simple enough to implement an inheritance-like structure.

If we have a definition of a base class – FacebookUser, which look like follows:

{
"namespace": "com.navteq.avro",
"name": "FacebookUser",
"type": "record",
"fields": [
  {"name": "name", "type": ["string", "null"] },
  {"name": "num_likes", "type": "int"},
  {"name": "num_photos", "type": "int"},
  {"name": "num_groups", "type": "int"} ]
} 

Listing 4 Definition of a Facebook user record

It is simple enough to create a definition of FacebookSpecialUser, which might look as follows:

{
    "namespace": "com.navteq.avro",
  "name": "FacebookSpecialUser",
  "type": "record",
  "fields": [
    {"name": "user", "type": com.navteq.avro.FacebookUser },
      {"name": "specialData", "type": "int"}
    ]
}

Listing 5 Definition of a Facebook special user record

Where a special user definition contains two fields – user of the type of Facebook user and a special data of the type int.

A simple test class for the special Facebook user is presented below:

package com.navteq.avro.inheritance;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.File;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.util.Utf8;
import org.junit.Before;
import org.junit.Test;

import com.navteq.avro.common.AvroUtils;

public class TestSimpleInheritance {

        private Schema schema;
        private Schema subSchema;

        @Before
        public void setUp() throws Exception {

                subSchema = AvroUtils.parseSchema(new File("resources/facebookUser.avro"));
                schema = AvroUtils.parseSchema(new File("resources/FacebookSpecialUser.avro"));

        }

        @Test
        public void testSimpleInheritance() throws Exception{
                ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
                GenericDatumWriter<GenericRecord> writer =
                            new GenericDatumWriter<GenericRecord>(schema);
                Encoder encoder = new BinaryEncoder(outputStream);

                GenericRecord subRecord1 = new GenericData.Record(subSchema);
                subRecord1.put("name", new Utf8("Doctor Who"));
                subRecord1.put("num_likes", 1);
                subRecord1.put("num_photos", 0);
                subRecord1.put("num_groups", 423);
                GenericRecord record1 = new GenericData.Record(schema);
                record1.put("user", subRecord1);
                record1.put("specialData", 1);

                writer.write(record1, encoder);

                GenericRecord subRecord2 = new GenericData.Record(subSchema);
                subRecord2.put("name", new org.apache.avro.util.Utf8("Doctor WhoWho"));
                subRecord2.put("num_likes", 2);
                subRecord2.put("num_photos", 0);
                subRecord2.put("num_groups", 424);
                GenericRecord record2 = new GenericData.Record(schema);
                record2.put("user", subRecord2);
                record2.put("specialData", 2);

                writer.write(record2, encoder);

                encoder.flush();

                ByteArrayInputStream inputStream =
                        new ByteArrayInputStream(outputStream.toByteArray());
                Decoder decoder = DecoderFactory.defaultFactory().
                        createBinaryDecoder(inputStream, null);
                GenericDatumReader<GenericRecord> reader =
                        new GenericDatumReader<GenericRecord>(schema);
                while(true){
                        try{
                              GenericRecord result = reader.read(null, decoder);
                              System.out.println(result);
                        }
                        catch(EOFException eof){
                                break;
                        }
                        catch(Exception ex){
                                ex.printStackTrace();
                        }
                }
        }
}[8]

Listing 6 A test class for a special Facebook user

Running this test class produces the expected result:

{"user": {"name": "Doctor Who", "num_likes": 1, "num_photos": 0,
"num_groups": 423}, "specialData": 1}
{"user": {"name": "Doctor WhoWho", "num_likes": 2, "num_photos": 0,
"num_groups": 424}, "specialData": 2}

Listing 7 Result of execution of special Facebook user test

This code works fine, if the only thing that is required is to have a record containing the base one and some extra parameters, but it does not provide polymorphism – one can’t read the same record and decide which type was actually read.

Implementing Polymorphism using Apache Avro

Because, unlike Google protocol buffers[9], Avro does not support optional parameters[10], the implementation of inheritance described above is not suitable for polymorphism implementation – the special data parameter always has to be present. Fortunately, Avro supports unions, which allow for skipping some of the record’s parameters. The following definitions can be used to create a polymorphic record. For the base record I will use the one described at Listing 4. For extensions we will use the following two definitions:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUserExtension1",
   "type": "record",
   "fields": [
      {"name": "specialData1", "type": "int"}
     ]
}

Listing 8 Definition of first extension record

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUserExtension2",
   "type": "record",
   "fields": [
      {"name": "specialData2", "type": "int"}
     ]
}

Listing 9 Definition of second extension record

With this two definitions in place a polymorphic record can be defined as follows:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUser",
   "type": "record",
   "fields": [
      {"name": "type", "type": "string" },
      {"name": "user", "type": com.navteq.avro.FacebookUser },
        {"name": "extension1", "type":
            [com.navteq.avro.FacebookSpecialUserExtension1, "null"]},
        {"name": "extension2", "type":
            [com.navteq.avro.FacebookSpecialUserExtension2, "null"]}
      ]
}

Listing 10 Polymorphic definition of Facebook Special user

Here both extension1 and extension2 are optional and either one can be present. To make processing even simpler, I have added a type field, which can be used to explicitly define the type of the record.

An even better definition of polymorphic record is presented below:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUser1",
   "type": "record",
   "fields": [
      {"name": "type", "type": "string" },
      {"name": "user", "type": com.navteq.avro.FacebookUser },
        {"name": "extension", "type":
            [com.navteq.avro.FacebookSpecialUserExtension1,
            com.navteq.avro.FacebookSpecialUserExtension2,
            "null"]}
      ]
}

Listing 11 Improved polymorphic definition of Facebook Special user

A simple test class for the polymorphic special Facebook user is presented below:

package com.navteq.avro.inheritance;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.File;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.util.Utf8;
import org.junit.Before;
import org.junit.Test;

import com.navteq.avro.common.AvroUtils;

public class TestInheritance {

        private Schema FBUser;
        private Schema base;
        private Schema ext1;
        private Schema ext2;

        @Before
        public void setUp() throws Exception {

                 base = AvroUtils.parseSchema(new File("resources/facebookUser.avro"));
                 ext1 = AvroUtils.parseSchema(
                         new File("resources/FacebookSpecialUserExtension1.avro"));
                 ext2 = AvroUtils.parseSchema(
                         new File("resources/FacebookSpecialUserExtension2.avro"));
                 FBUser = AvroUtils.parseSchema(new File("resources/FacebooklUserInheritance.avro"));
}

        @Test
        public void testInheritanceBinary() throws Exception{
                 ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
                 GenericDatumWriter<GenericRecord> writer =
                         new GenericDatumWriter<GenericRecord>(FBUser);
                 Encoder encoder = new BinaryEncoder(outputStream);

                 GenericRecord baseRecord = new GenericData.Record(base);
                 baseRecord.put("name", new Utf8("Doctor Who"));
                 baseRecord.put("num_likes", 1);
                 baseRecord.put("num_photos", 0);
                 baseRecord.put("num_groups", 423);
                 GenericRecord FBrecord = new GenericData.Record(FBUser);
                 FBrecord.put("type", "base");
                 FBrecord.put("user", baseRecord);

                 writer.write(FBrecord, encoder);

                 baseRecord = new GenericData.Record(base);
                 baseRecord.put("name", new Utf8("Doctor WhoWho"));
                 baseRecord.put("num_likes", 1);
                 baseRecord.put("num_photos", 0);
                 baseRecord.put("num_groups", 423);
                 GenericRecord extRecord = new GenericData.Record(ext1);
                 extRecord.put("specialData1", 1);
                 FBrecord = new GenericData.Record(FBUser);
                 FBrecord.put("type", "extension1");
                 FBrecord.put("user", baseRecord);
                 FBrecord.put("extension", extRecord);

                 writer.write(FBrecord, encoder);

                 baseRecord = new GenericData.Record(base);
                 baseRecord.put("name", new org.apache.avro.util.Utf8("Doctor WhoWhoWho"));
                 baseRecord.put("num_likes", 2);
                 baseRecord.put("num_photos", 0);
                 baseRecord.put("num_groups", 424);
                 extRecord = new GenericData.Record(ext2);
                 extRecord.put("specialData2", 2);
                 FBrecord = new GenericData.Record(FBUser);
                 FBrecord.put("type", "extension2");
                 FBrecord.put("user", baseRecord);
                 FBrecord.put("extension", extRecord);

                 writer.write(FBrecord, encoder);

                 encoder.flush();

                 byte[] data = outputStream.toByteArray();
                 ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
                 Decoder decoder =
                        DecoderFactory.defaultFactory().createBinaryDecoder(inputStream, null);
                 GenericDatumReader<GenericRecord> reader =
                        new GenericDatumReader<GenericRecord>(FBUser);
                 while(true){
                        try{
                               GenericRecord result = reader.read(null, decoder);
                               System.out.println(result);
                        }
                        catch(EOFException eof){
                               break;
                        }
                        catch(Exception ex){
                               ex.printStackTrace();
                        }
                 }
        }
}

Listing 12 A test class for a polymorphic Facebook user record

Running this test class produces the expected result:

{"type": "base", "user": {"name": "Doctor Who", "num_likes": 1, "num_photos":
0, "num_groups": 423}, "extension": null}
{"type": "extension1", "user": {"name": "Doctor WhoWho", "num_likes": 1,
"num_photos": 0, "num_groups": 423}, "extension": {"specialData1": 1}}
{"type": "extension2", "user": {"name": "Doctor WhoWhoWho", "num_likes": 2,
"num_photos": 0, "num_groups": 424}, "extension": {"specialData2": 2}} 

Listing 13 Result of execution of polymorphic Facebook user record test

Backward compatibility using Apache Avro

One of the advantages of XML is backward compatibility of data if the schema definition is extended with optional parameters. To test this feature for Avro we will introduce a definition of third extension record:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUserExtension3",
   "type": "record",
   "fields": [
      {"name": "specialData3", "type": "int"}
   ]
}

Listing 14 Definition of third extension record

And change definition of polymorphic record is presented below:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUser11",
   "type": "record",
   "fields": [
     {"name": "type", "type": "string" },
     {"name": "user", "type": com.navteq.avro.FacebookUser },
       {"name": "extension", "type":
          [com.navteq.avro.FacebookSpecialUserExtension1,
          com.navteq.avro.FacebookSpecialUserExtension2,
          com.navteq.avro.FacebookSpecialUserExtension3,
          "null"]}
     ]
}

Listing 15 Improved polymorphic definition of Facebook Special user

When the code in Listing 12 was modified to read records using record definition Listing 15 (while still writing it using record definition Listing 11) it produces the following result:

{"type": "base", "user": {"name": "Doctor Who", "num_likes": 1, "num_photos":
0, "num_groups": 423}, "extension": {"specialData3": 10}}
java.lang.ArrayIndexOutOfBoundsException
      at java.lang.System.arraycopy(Native Method)
      at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:331)
      at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:265)
      at org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:99)
      at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:318)
      at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:312)
      at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:120)
      at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:142)
      at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:114)
      at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:105)
      at com.navteq.avro.inheritance.TestInheritance.testInheritanceBinary(TestInheritance.java:119)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
      at java.lang.reflect.Method.invoke(Unknown Source)

 

Listing 16 Result of execution of polymorphic Facebook user record test with extended definition

Although Avro provides an API, which allows fixing this problem - GenericDatumReader<GenericRecord> constructor can take 2 parameters – the schema that was used to write the record and the schema that is used to read the record, this is not always a viable solution for backward compatibility, because its forcing to keep track of schemas that were used for writing every record.

A more appropriate solution can be achieved by switching from the binary encoder/decoder, which creates a binary representation of the record to JSON encoder/decoder, creating a JSON representation of the record. In this case, the code works, producing the following result:

{"type": "base", "user": {"name": "Doctor Who", "num_likes": 1, "num_photos":
0, "num_groups": 423}, "extension": null}
{"type": "extension1", "user": {"name": "Doctor WhoWho", "num_likes": 1,
"num_photos": 0, "num_groups": 423}, "extension": {"specialData1": 1}}
{"type": "extension2", "user": {"name": "Doctor WhoWhoWho", "num_likes": 2,
"num_photos": 0, "num_groups": 424}, "extension": {"specialData2": 2}}

Listing 17 Result of execution of polymorphic Facebook user record test with extended definition and JSON encoding

In the case of the JSON encoder, the actual data is converted to JSON:

{"type":"base","user":{"name":{"string":"Doctor
Who"},"num_likes":1,"num_photos":0,"num_groups":423},"extension":null}
{"type":"extension1","user":{"name":{"string":"Doctor
WhoWho"},"num_likes":1,"num_photos":0,"num_groups":423},"extension":{"FacebookSpecialUserExtension1":{"specialData1":1}}}
{"type":"extension2","user":{"name":{"string":"Doctor
WhoWhoWho"},"num_likes":2,"num_photos":0,"num_groups":424},"extension":{"FacebookSpecialUserExtension2":{"specialData2":2}}}

Listing 18 Converted data in the case of JSON encoding

An additional consideration here is that in my test the same data was producing 89 bytes of Avro record in the case of binary encoding and 473 bytes in the case of JSON encoding

Conclusion

The current implementation of Avro does not directly support schema componentization or schema components reuse. A simple framework, presented in the article, provides support for these features. Although Avro does not directly support polymorphism, proper schema design, presented in the article, allows one to easily implement polymorphic data schemas. When it comes to true backward compatibility Avro does support it only if JSON encoding is used[11]. The last one is not so much feature of Avro, but by JSON itself. The last one severely limits Avro applicability (if backward compatibility is a requirement), reducing its usage to a high level APIs for JSON marshalling and processing.

In addition to the generic Avro approach (used here), a specific Avro can be used. In this case, instead of generic record a specific record can be generated (by Avro) based on a record definition. Despite the claims[12] of performance gains for Avro specific usage, my experiments with current version of Avro (1.4.1) showed about the same performance for both.


[1] http://hadoop.apache.org/avro/

[2] http://avro.apache.org/docs/1.4.1/

[3] http://code.google.com/p/thrift-protobuf-compare/wiki/Benchmarking

[4] The few ones that I have found are at Avro marshalling and Avro Map Reduce

[5] http://avro.apache.org/docs/current/spec.html

[6] Interestingly enough, Avro IDL includes support for child IDL

[7] Unlike XML, which explicitly supports base types in the type definition.

[8] One of the things to note about the above code is that schema parsing is done in the constructor. The reason for this is that schema parsing is the most expensive operation in Avro implementation.

[9] http://code.google.com/p/protobuf/

[10] Avro supports “null”, which is not the same as an optional parameter. “Null” in Avro specifies that no value is provided for a given attribute

[11] Or if an old version of the schema is available

[12] http://code.google.com/p/thrift-protobuf-compare/wiki/Benchmarking

 

 

 

 

Rate this Article

Adoption
Style

BT