Flume flexible storage in HDFS with JsonHandler

I just found a good solution to storage data in HDFS with FLume, this solution based on Json Handler provide with HTTP Source.


HTTP Source

A source which accepts Flume Events by HTTP POST and GET. GET should be used for experimentation only. HTTP requests are converted into flume events by a pluggable “handler” which must implement the HTTPSourceHandler interface. This handler takes a HttpServletRequest and returns a list of flume events. All events handled from one Http request are committed to the channel in one transaction, thus allowing for increased efficiency on channels like the file channel. If the handler throws an exception, this source will return a HTTP status of 400. If the channel is full, or the source is unable to append events to the channel, the source will return a HTTP 503 - Temporarily unavailable status.
All events sent in one post request are considered to be one batch and inserted into the channel in one transaction.

JSONHandler
A handler is provided out of the box which can handle events represented in JSON format, and supports UTF-8, UTF-16 and UTF-32 character sets. The handler accepts an array of events (even if there is only one event, the event has to be sent in an array) and converts them to a Flume event based on the encoding specified in the request. If no encoding is specified, UTF-8 is assumed. The JSON handler supports UTF-8, UTF-16 and UTF-32. Events are represented as follows.

Flume config file
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type =  org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 44448
#a1.sources.r1.handler = org.apache.flume.http.JSONHandler

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:8020/data/%{m_user}/%{m_year}/%{m_month}/%{m_day}
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 

Test put data with curl command

curl -X POST -d '[{  "headers" : { "timestamp" : "434324343", "host" :"random_host.example.com", "field1" : "val1", "m_user" : "m_user", "m_year" : "m_year", "m_month" : "m_month", "m_day" : "m_day" },  "body" : "random_body"  }]' localhost:44448


Comments