HDFS file watcher
Asked Answered
U

3

9

Can I have file watcher on HDFS?

Scenario: The files are landing on HDFS continuously.I want to start a Spark Job once the number of files reached a threshold(it can be number of files or size of the files).

Is it possible to implement file watcher on HDFS to achieve this . If yes, then can anyone suggest the way to do it?What are the different options available? Can the Zookeeper or the Oozie do it?

Any help will be appreciated.Thanks.

Unprintable answered 30/4, 2015 at 6:5 Comment(7)
Spark Streaming has a similar function: in FileInputDStreamRafaelof
Simple thing i can think is you can make use of unix command like this : hadoop fs -ls | wc -lHesitant
@YijieShen Can you elaborate it more,please?Unprintable
@Hesitant Yes..It will list all the files & it's details in the directory on the hdfs.I am trying to implement it as per your suggestion.Unprintable
Yes, you can do this with Inotification. You just need to get the details of HDFS transaction thru inotifyier, to get better understanding read this link.Marmoreal
I will try it .Unprintable
Go ahead, let me know if you face any issue.Marmoreal
G
14

Hadoop 2.6 introduced DFSInotifyEventInputStream that you can use for this. You can get an instance of it from HdfsAdmin and then just call .take() or .poll() to get all the events. Event types include delete, append and create which should cover what you're looking for.

Here's a basic example. Make sure you run it as the hdfs user as the admin interface requires HDFS root.

public static void main( String[] args ) throws IOException, InterruptedException, MissingEventsException
{
    HdfsAdmin admin = new HdfsAdmin( URI.create( args[0] ), new Configuration() );
    DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream();
    while( true ) {
        EventBatch events = eventStream.take();
        for( Event event : events.getEvents() ) {
            System.out.println( "event type = " + event.getEventType() );
            switch( event.getEventType() ) {
                case CREATE:
                    CreateEvent createEvent = (CreateEvent) event;
                    System.out.println( "  path = " + createEvent.getPath() );
                    break;
                default:
                    break;
            }
        }
    }
}

Here's a blog post that covers it in more detail:

http://johnjianfang.blogspot.com/2015/03/hdfs-6634-inotify-in-hdfs.html?m=1

Greenman answered 1/5, 2015 at 17:55 Comment(0)
O
1

Oozie coordinator can do this. Oozie coordinator actions can be triggered based on data availability. Write a data triggered coordinator. The coordinator actions are triggered based on the done-flag. done-flag is nothing but an empty file. So when your threshold is reached write an empty file into the directory.

Oiler answered 5/6, 2017 at 13:22 Comment(0)
M
0

Old thread... In case, if someone wants to do this in Scala

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.client.HdfsAdmin
import org.apache.hadoop.hdfs.inotify.Event.{AppendEvent, CreateEvent, RenameEvent}


object HDFSTest extends App {
  val admin = new HdfsAdmin( URI.create( "hdfs://namenode:port" ), new Configuration() )
  val eventStream = admin.getInotifyEventStream()

  while( true ) {
    val events =  eventStream.poll(2l, java.util.concurrent.TimeUnit.SECONDS)
    events.getEvents.toList.foreach { event ⇒
      println(s"event type = ${event.getEventType}")
      event match {
        case create: CreateEvent ⇒
          println("CREATE: " + create.getPath)

        case rename: RenameEvent ⇒
          println("RENAME: " + rename.getSrcPath + " => " + rename.getDstPath)

        case append: AppendEvent ⇒
          println("APPEND: " + append.getPath)

        case other ⇒
          println("other: " + other)
      }
    }
  }
}

In case, if one wants to use an impersonated user... set env var: HADOOP_USER_NAME=user-name

Memling answered 10/9, 2019 at 20:45 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.