piątek, 4 stycznia 2013

Map Reduce implementation with Hadoop

Hadoop is an open source framework which supports big data distributed application. One od main features is MapReduce algorithm implementation. Hadoop gives us opportunity to use it's API to implement our own Maping and Reducing. To run your first hadoop job you will need to implement generic Mapper and Reducer classes. See examples below.
public class Map extends MapReduceBase implements
  Mapper {

 private final IntWritable one = new IntWritable(1);
 private Text word = new Text();

 public void map(LongWritable key, Text value,
   OutputCollector output, Reporter reporter)
   throws IOException {

  String line = value.toString();
  StringTokenizer tokenizer = new StringTokenizer(line);
  while (tokenizer.hasMoreTokens()) {
   word.set(tokenizer.nextToken());
   output.collect(word, one);
  }
 }
}
public class Reduce extends MapReduceBase implements
  Reducer {
 public void reduce(Text key, Iterator values,
   OutputCollector output, Reporter reporter)
   throws IOException {
  int sum = 0;

  while (values.hasNext()) {
   sum += values.next().get();
  }

  output.collect(key, new IntWritable(sum));
 }
}
Last thing you need to do is clip them together using Job Configuration
public class Job {

 public static final void main(String[] args) throws Exception {

  JobConf conf = new JobConf(Job.class);
  conf.setJobName("Hadoop-Workshop-GKOLPU");

  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(IntWritable.class);

  conf.setMapperClass(Map.class);
  conf.setCombinerClass(Reduce.class);
  conf.setReducerClass(Reduce.class);

  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputFormat(TextOutputFormat.class);

 FileInputFormat.setInputPaths(conf,new Path(args[1]));
 FileOutputFormat.setOutputPath(conf,new Path(args[2]));

  JobClient.runJob(conf);
 }
}

Hadoop. Using HDFS.

One of Hadoop concepts is having consistent file system for all provided cluster nodes. For Hadoop Distributed File System avilable are unix-based commands set. You can read full syntax calling hadoop dsf -help, nevertheless I atached part of it below. -fs [local | <file system URI>]: Specify the file system to use.

-ls <path>: List the contents that match the specified file pattern.

-lsr <path>: Recursively list the contents that match the specified file pattern.

-mv <src> <dst>: Move files that match the specified file pattern <src>

to a destination <dst>. -cp <src> <dst>: Copy files that match the file pattern <src> to a destination.

-rm [-skipTrash] <src>: Delete all files that match the specified file pattern. Equivalent to the Unix command "rm <src>"

-rmr [-skipTrash] <src>: Remove all directories which match the specified file pattern. Equivalent to the Unix command "rm -rf <src>"

-put <localsrc> ... <dst>: Copy files from the local file system into fs.

-copyFromLocal <localsrc> ... <dst>: Identical to the -put command.

-moveFromLocal <localsrc> ... <dst>: Same as -put, except that the source is deleted after it's copied.

-get [-ignoreCrc] [-crc] <src> <localdst>: Copy files that match the file pattern <src> to the local name. <src> is kept.

-cat <src>: Fetch all files that match the file pattern <src> and display their content on stdout.

-copyToLocal [-ignoreCrc] [-crc] <src> <localdst>: Identical to the -get command.

-mkdir <path>: Create a directory in specified location.

-tail [-f] <file>: Show the last 1KB of the file. The -f option shows apended data as the file grows.

Hadoop. MapReduce File InputFormat

In hadoop it is very important how we read data for Map Reduce. Standard input is a file set in HDFS. I will try to explain how to define your own file input format. First step is implementing InputFormat interface or extend one of it's implementations like in the example below.
public class EmailInputFormat extends FileInputFormat {

 @Override
 public RecordReader getRecordReader(InputSplit split,
  JobConf job, Reporter reporter) throws IOException {
  reporter.setStatus(split.toString());
  return new EmailRecordReader(job, (FileSplit) split);
 }

}
getRecordReader method have to return RecordReader implementation, so let's create one more class.
public class EmailRecordReader implements RecordReader {
 private LineRecordReader lineReader;
 private LongWritable lineKey;
 private Text lineValue;

 public EmailRecordReader(JobConf job, FileSplit split) throws 
                 IOException {
  lineReader = new LineRecordReader(job, split);

  lineKey = lineReader.createKey();
  lineValue = lineReader.createValue();
 }

 public boolean next(Text key, Email value) throws 
                 IOException {
  // TODO Auto-generated method stub
  // put your code here

  // --
  return false;
 }

 public Text createKey() {
  return new Text("");
 }

 public Email createValue() {
  return new Email();
 }

 public long getPos() throws IOException {
  return lineReader.getPos();
 }

 public void close() throws IOException {
  lineReader.close();
 }

 public float getProgress() throws IOException {
  return lineReader.getProgress();
 }

}
We need to implement constructor to utilize FileSpit but the most important part of this code is next method. Hadoop enginne will be running this method until returning false. So we can use it to produce how much input records we want.

Hadoop. MapReduce File OutputFormat.

Hadoop provide much API for MapReduce customizing. Almost ever we want to get results in some specific format. We just nedd to do two things. Implement OutputFormat interface or extend it's implementation. See the code below.

public class EmailXmlOutputFormat extends FileOutputFormat {

 public RecordWriter getRecordWriter(FileSystem ignored, 
                  JobConf job,String name, Progressable progress)
                  throws IOException {
  Path file = FileOutputFormat
                         .getTaskOutputPath(job, name);
  FileSystem fs = file.getFileSystem(job);
  FSDataOutputStream fileOut=fs.create(file, progress);
  return new EmailXmlRecordWriter(fileOut);
 }
}
To make code complete implementing RecordWriter...

public class EmailXmlRecordWriter implements RecordWriter {
 private static final String utf8 = "UTF-8";

 private DataOutputStream out;

 public EmailXmlRecordWriter(DataOutputStream out) 
    throws IOException {
  this.out = out;
  out.writeBytes("\n");
 }

 public synchronized void write(Text key, Text value) 
    throws IOException {

  boolean nullKey = key == null;
  boolean nullValue = value == null;

  if (nullKey && nullValue) {
   return;
  }
 
  //TODO
  //put your code here 
  //write to out stream

 }

 public synchronized void close(Reporter reporter) 
    throws IOException {
  try {
   out.writeBytes("\n");
  } finally {
   out.close();
  }
 }
}
Firstable coding constructor and close method. Then putting some logic into write. Done.

Custom database logging with Jboss AS 6

I needed to persist some part of my application logs (deployed on jboss AS 6) into the oracle database. I started from looking into jboss-logging.xml file. There is much commented example code, using JMS, sending emails, file loggers and others, but unfortunately nothing about logging into db. Not a problem, I thought, just need to check jboss documentation. I was very surprised but on the jboss.org is nothing, no documentation, zero. For AS 5 or 7 - no problem, documentation is available but not AS 6 – unbelievable. Next I searched in the internet. Found not much, only e few posts on forums, few articles of log4j logging but with no Jboss AS 6 syntax which is different than previous versions and typical log4j configuration xml file. So, I wanted to share my result, It was not so easy than I excepted, so it could be usefull. First you need to add log4j-appender declaration
   <log4j-appender name="DEV_JDBC" 
  class="org.apache.log4j.jdbc.JDBCAppender">
      <error-manager>
         <only-once/>
      </error-manager>
      <level name="INFO"/>
      <properties>
         <property name="driver">
  oracle.jdbc.driver.OracleDriver
   </property>
         <property name="URL">
  jdbc:oracle:thin:@mydbhost.gkolpu.com:1521:dev
   </property>
         <property name="user">DEV</property>
         <property name="password">DEV</property>
         <property name="sql">
  insert into logging_table(id, tmstmp,message) 
  values(sequence.nextval,now(),'%m')
   </property>
         <property name="bufferSize">100</property>
      </properties>

   </log4j-appender>
And use it in logger
<logger category="org.gkolpu.logging.db" use-parent-handlers="false">
       <level name="INFO"/>
       <handlers>
         <handler-ref name="DEV_JDBC"/>
       </handlers>
     </logger>
Actually my goal was, calling stored procedure to save the log in db, by I couldn’t use JDBCAppender for that, it was not working. To fix this I coded my own appender, actually only extended JDBCAppender. Original code used Statement class to execute insert operation. This is java code (jar with it complied must be putted to jboss libs).
package org.gkolpu;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.SQLException;

import org.apache.log4j.jdbc.JDBCAppender;

public class CallableJDBCAppender extends JDBCAppender {
 @Override
 protected void execute(String sql) throws SQLException {

  Connection con = null;
  CallableStatement call = null;

  try {
   con = getConnection();
   call = con.prepareCall(sql);
   call.execute();
  } finally {
   if (call != null) {
    call.close();
   }
   closeConnection(con);
  }
 }
}
And a few changes into the appender declaration.
<log4j-appender name="DEV_JDBC" 
 class="org.gkolpu.CallableJDBCAppender">
      <error-manager>
         <only-once/>
      </error-manager>
      <level name="INFO"/>
      <properties>
         <property name="driver">
  oracle.jdbc.driver.OracleDriver
   </property>
         <property name="URL">
  jdbc:oracle:thin:@mydbhost.gkolpu.com:1521:dev
   </property>
         <property name="user">DEV</property>
         <property name="password">DEV</property>
         <property name="sql">
  {call MY_PACKAGE.log_message('%m')}
   </property>
         <property name="bufferSize">100</property>
      </properties>

   </log4j-appender>
;