How to insert JSON in HDFS using Flume correctly
Asked Answered
B

3

6

I am using the HTTPSource in Flume for receiving POST events in json format as follows:

{"username":"xyz","password":"123"}

My question is: Do I have to modify the source of the events (I mean the one that is sending the JSON to the Flume) so the JSON, has the following format:

[{
  "headers" : {
             "timestamp" : "434324343",
             "host" : "random_host.example.com"
             },
  "body" : "{"username":"xyz","password":"123"}"
}]

This is the best way to do it? Or I can modify it everywhere else?

My conf file for the flume agent is:

## Componentes
SomeAgent.sources = SomeHTTP
SomeAgent.channels = MemChannel
SomeAgent.sinks = SomeHDFS

## Fuente e Interceptores
SomeAgent.sources.SomeHTTP.type = http
SomeAgent.sources.SomeHTTP.port = 5140
SomeAgent.sources.SomeHTTP.handler = org.apache.flume.source.http.JSONHandler
SomeAgent.sources.SomeHTTP.channels = MemChannel
SomeAgent.sources.SomeHTTP.interceptors = i1 i2

## Interceptores
SomeAgent.sources.SomeHTTP.interceptors.i1.type = timestamp
SomeAgent.sources.SomeHTTP.interceptors.i2.type = host
SomeAgent.sources.SomeHTTP.interceptors.i2.hostHeader = hostname

## Canal
SomeAgent.channels.MemChannel.type = memory
SomeAgent.channels.MemChannel.capacity = 10000
SomeAgent.channels.MemChannel.transactionCapacity = 1000

## Sumidero
SomeAgent.sinks.SomeHDFS.type = hdfs
SomeAgent.sinks.SomeHDFS.channel = MemChannel
SomeAgent.sinks.SomeHDFS.hdfs.path = /raw/logs/%Y-%m-%d
SomeAgent.sinks.SomeHDFS.hdfs.fileType = DataStream
SomeAgent.sinks.SomeHDFS.hdfs.filePrefix = SomeLogs-
SomeAgent.sinks.SomeHDFS.hdfs.writeFormat = Text
SomeAgent.sinks.SomeHDFS.hdfs.batchSize = 100
SomeAgent.sinks.SomeHDFS.hdfs.rollSize = 0
SomeAgent.sinks.SomeHDFS.hdfs.rollCount = 10000
SomeAgent.sinks.SomeHDFS.hdfs.rollInterval = 600
SomeAgent.sinks.SomeHDFS.hdfs.useLocalTimeStamp = true

Running the cat of hadoop fs

$ hadoop fs -ls -R /raw/logs/somes
drwxr-xr-x   - flume-agent supergroup          0 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16
-rw-r--r--   3 flume-agent supergroup       3814 2015-06-16 12:33 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434471803369
-rw-r--r--   3 flume-agent supergroup       3719 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434472404774


$ hadoop fs -cat /raw/logs/somes/2015-06-16/SomeLogs.1434471803369 | head




$

(you look correctly, empty lines)

If now I look at the file (using the binary view of HUE for example):

0000000:    0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a   ................
0000010:    0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a   ................
0000020:    0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a   ................
Bursary answered 15/6, 2015 at 18:49 Comment(0)
A
4

If I've understood well, you want to serialize both the data and the headers. In that case, you do not have to modify the data source, but use some standard Flume elements and create your custom serializer for HDFS.

The first step is to achieve Flume creates the desired JSON structure, i.e. headers+body. Flume is able to do it for you, just use JSONHandler at your HTTPSource, this way:

a1.sources = r1
a1.sources.r1.hnadler = org.apache.flume.source.http.JSONHandler

In fact, it is not necessary to configure the JSON handler since it is the default one for HTTPSource.

Then, use both Timestamp Interceptor and Host Interceptor in order to add the desired headers. The only trick is the Flume agent must run in the same machine than the sender process in order the intercepted host is the same than the sender one:

a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader = hostname

At this point, you will have the desired event. Nevertheless, standard serializers for HDFS only save the body, not the headers. Thus create a custom serializer that implements org.apache.flume.serialization.EventSerializer. It is configured as:

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.serializer = my_custom_serializer

HTH

Angstrom answered 15/6, 2015 at 21:14 Comment(4)
I've found this link (grokbase.com/t/flume/user/128nspvnfg/…) saying that cuatom serializers for HDFSSink can only be created for fileType=CompressedStream or DataStream. I do not know if it is currently fixed for SequenceFiles.Angstrom
frb thanks for your response, I just pasted the configuration file, but when I look (using hadoop fs -cat /raw/log/2015-06-15/SomeLog-.1434410388430) I don't see anything (a lot of empty bunch lines, which I suspect are in binary) Could you see the error?Bursary
I added the output as binary in the question ... Is not logging anything :(Bursary
could you modify your answer with my answer so I can put yours as the correct one?Bursary
B
3

The answer posted by @frb was correct, the only point missing is that the JSON generator must send the body part (I must admit/complain that the docs are not clear in that point), so, the correct way of posting the json is

[body:"{'username':'xyz','password':'123'}"]

Please note that the json of data is now a string.

With this change, the json is now visible in the hdfs.

Bursary answered 17/6, 2015 at 16:34 Comment(2)
Good point! Did you finally have to modify the events source in order to send the JSON as a string? Or did you modifying in some way the JSONHandler in order to stringify the received data?Angstrom
I modifed the source.The change was stringify the json and add the key bodyBursary
I
1

The Flume HTTPSource using the default JSONHandler expects a list of fully-formed Flume events in JSON representation [{ headers: ..., body: ... }] to be submitted to the endpoint; to create an agent endpoint which can accept a bare application-level structure like {"username":"xyz", "password":"123"}, you can override the handler with an alternative class which implements HTTPSourceHandler; see the JSONHandler source - there's not a lot to it.

public List<Event> getEvents(HttpServletRequest request) throws ...

In a custom JSONHandler you could also add headers to the event based on the HTTP request, such as the source IP, User-Agent etc (an Interceptor won't have the context for this). You may want to validate the application-supplied JSON at this point (though the default handler doesn't).

Although as you've found, you can pass just the [{body: ...}] part, such a custom handler could also be useful if you want to prevent a generator injecting headers for the event.

Isotope answered 25/10, 2016 at 11:25 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.