07 May 2015

Monitoring a java application with mbeans. An example with samtools/htsjdk.

"A MBean is a Java object that follows the JMX specification. A MBean can represent a device, an application, or any resource that needs to be managed. The JConsole graphical user interface is a monitoring tool that complies to the JMX specification.". In this post I'll show how I've modified the sources of the htsjdk library to monitor the java program reading a VCF file from the Exac server. See my commit at https://github.com/lindenb/htsjdk/commit/3c1ac1a18917aaa69f8dc49c70fd893a6a0542c3.

First, we define a java class ProgressLoggerMBean to tell java about the informations that will be forwarded to the jconsole: the number of records processed, the elapsed time, etc...

package htsjdk.samtools.util;
public interface ProgressLoggerMBean
    {
    /* the noun to use when logging, e.g. "Records, Variants, Loci" */
    public String getNoun();
    /* verb the verb to log, e.g. "Processed, Read, Written" */
    public String getVerb();
    /** Returns the count of records processed. */
    public long getCount();
    /** elapsed time */
    public String getElapsedTime();
    /** last record */
    public String getLastRecord();
    }
The already existing htsjdk class htsjdk.samtools.util.ProgressLogger is modified: it now implements ProgressLoggerMBean:
public class ProgressLogger
   implements ProgressLoggerInterface, Closeable, ProgressLoggerMBean
The methods are implemented:
(...)
     @Override
    public String getElapsedTime(){
        return this.formatElapseTime(this.getElapsedSeconds());
    }
    
    @Override
    public String getLastRecord(){
        return this.lastRecord;
    }
In the constructor we try to connect to the MBean server that has been created and initialized by the platform. The ProgressLogger is wrapped into an ObjectName and inserted in the MBean server:
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
/* defines an object name for the MBean instance that it will create */
this.objectMBean = new ObjectName("htsjdk.samtools.util:type="+noun);
mbs.registerMBean(this, this.objectMBean);

A 'close' method is used to unregister the object from the MBean server:
@Override
public void close() {
    if(this.objectMBean!=null) {
        try {
            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
            mbs.unregisterMBean(this.objectMBean);
        } catch(Exception err) {
        //ignore   
        } finally {
        this.objectMBean=null;
        }
    }
Here is an example. This program uses the htsjdk library to parse a VCF file:
import htsjdk.variant.vcf.*;
import htsjdk.variant.variantcontext.*;
import htsjdk.tribble.readers.*;
import htsjdk.samtools.util.*;

public class TestProgress
 {
 private final static Log log = Log.getInstance(TestProgress.class);
 public static void main(String args[]) throws Exception
  {
  ProgressLoggerInterface progress = new ProgressLogger(log, 1000, "Read VCF");
  VCFCodec codec= new VCFCodec();
  LineReader r= LineReaderUtil.fromBufferedStream(System.in);
  LineIteratorImpl t= new LineIteratorImpl(r);
  codec.readActualHeader(t);
  while(t.hasNext())
   {
   VariantContext ctx = codec.decode(t.next());
   progress.record(ctx.getContig(),ctx.getStart());
   }
  r.close();
  progress.close();
  }
 }
Compile and execute to download Exac:
javac -cp dist/htsjdk-1.130.jar:dist/snappy-java-1.0.3-rc3.jar:dist/commons-jexl-2.1.1.jar:dist/commons-logging-1.1.1.jar TestProgress.java && \
curl -s  "ftp://ftp.broadinstitute.org/pub/ExAC_release/release0.3/ExAC.r0.3.sites.vep.vcf.gz" |\
gunzip -c  |\
java -cp dist/htsjdk-1.130.jar:dist/snappy-java-1.0.3-rc3.jar:dist/commons-jexl-2.1.1.jar:dist/commons-logging-1.1.1.jar:. TestProgress

(...)
INFO 2015-05-07 21:07:02 TestProgress Read VCF       675,000 records.  Elapsed time: 00:03:33s.  Time for last 1,000:    0s.  Last read position: 1:168,035,033
INFO 2015-05-07 21:07:03 TestProgress Read VCF       676,000 records.  Elapsed time: 00:03:33s.  Time for last 1,000:    0s.  Last read position: 1:168,216,140
INFO 2015-05-07 21:07:03 TestProgress Read VCF       677,000 records.  Elapsed time: 00:03:34s.  Time for last 1,000:    0s.  Last read position: 1:169,076,058
INFO 2015-05-07 21:07:03 TestProgress Read VCF       678,000 records.  Elapsed time: 00:03:34s.  Time for last 1,000:    0s.  Last read position: 1:169,366,434
INFO 2015-05-07 21:07:03 TestProgress Read VCF       679,000 records.  Elapsed time: 00:03:34s.  Time for last 1,000:    0s.  Last read position: 1:169,500,081
(...)

The progression can now be monitored in the jconsole:


That's it.
Pierre

05 May 2015

Playing with hadoop/mapreduce and htsjdk/VCF : my notebook.

The aim of this test is to get a count of each type of variant/genotypes in a VCF file using Apache Hadoop and the java library for NGS htsjdk. My source code is available at: https://github.com/lindenb/hadoop-sandbox/blob/master/src/main/java/com/github/lindenb/hadoop/Test.java.

First, and this is my main problem, I needed to create a class 'VcfRow' that would contains the whole data about a variant. As I need to keep the information about all the semantics in the VCF header, each record contains the whole VCF header (!). I asked SO if there was an elegant way to save the header in the hadoop workflow but it currently seems that there is no such solution (http://stackoverflow.com/questions/30052859/hadoop-mapreduce-handling-a-text-file-with-a-header). This class
VcfRow must implement WritableComparable to be serialized by the hadoop pipeline. It's awfully sloooooow since we need to parse a htsjdk.variant.vcf.VCFHeader and a htsjdk.variant.vcf.VCFCodec for each new variant.

public static class VcfRow
implements WritableComparable<VcfRow>
 {
 private List<String> headerLines;
 private String line;
 private VariantContext ctx=null;
 private VCFHeader header =null;
 private VCFCodec codec=new VCFCodec();
 public VcfRow()
   {
 this.headerLines = Collections.emptyList();
 this.line="";
   }
 public VcfRow(List<String> headerLines,String line)
  {
 this.headerLines=headerLines; 
 this.line=line;
  }
 
@Override
public void write(DataOutput out) throws IOException
 {
 out.writeInt(this.headerLines.size());
 for(int i=0;i< this.headerLines.size();++i)
  {
  out.writeUTF(this.headerLines.get(i));
  }
 byte array[]=line.getBytes();
 out.writeInt(array.length);
 out.write(array);
 }

@Override
public void readFields(DataInput in) throws IOException
 {
 int n= in.readInt();
 this.headerLines=new ArrayList<String>(n);
 for(int i=0;i<n;++i) this.headerLines.add(in.readUTF());
 n = in.readInt();
 byte array[]=new byte[n];
 in.readFully(array);
 this.line=new String(array);
 this.codec=new VCFCodec();
 this.ctx=null;
 this.header=null;
 }

public VCFHeader getHeader()
 {
 if(this.header==null)
  {
  this.header = (VCFHeader)this.codec.readActualHeader(new MyLineIterator());
  }
 return this.header;
 }

public VariantContext getVariantContext()
 {
 if(this.ctx==null)
  {
  if(this.header==null) getHeader();//force decode header
  this.ctx=this.codec.decode(this.line);
  }
 return this.ctx;
 }

@Override
public int compareTo(VcfRow o)
 {
 int i = this.getVariantContext().getContig().compareTo(o.getVariantContext().getContig());
 if(i!=0) return i;
 i = this.getVariantContext().getStart() - o.getVariantContext().getStart();
 if(i!=0) return i;
 i =  this.getVariantContext().getReference().compareTo( o.getVariantContext().getReference());
 if(i!=0) return i;
 return this.line.compareTo(o.line);
 }

   private  class MyLineIterator
 extends AbstractIterator<String>
 implements LineIterator
 { 
 int index=0;
 @Override
 protected String advance()
  {
  if(index>= headerLines.size()) return null;
  return headerLines.get(index++);
  }
 }
}

Then a special InputFormat is created for the VCF format. As we need to keep a trace of the Header, this file declares `isSplitable==false`. The class VcfInputFormat creates an instance of RecordReader reading the whole VCF header the first time it is invoked with the method `initialize`. This 'VcfRecordReader' creates a new VcfRow for each line.

public static class VcfInputFormat extends FileInputFormat<LongWritable, VcfRow>
   {
 private List<String> headerLines=new ArrayList<String>();
 
 @Override
 public RecordReader<LongWritable, VcfRow> createRecordReader(InputSplit split,
   TaskAttemptContext context) throws IOException,
   InterruptedException {
  return new VcfRecordReader();
  }  
 @Override
 protected boolean isSplitable(JobContext context, Path filename) {
  return false;
  }
  
 //LineRecordReader
  private class VcfRecordReader extends RecordReader<LongWritable, VcfRow>
    {
  private LineRecordReader delegate=new LineRecordReader();
  public VcfRecordReader() throws IOException
    {
    }
  
   @Override
  public void initialize(InputSplit genericSplit,
    TaskAttemptContext context) throws IOException {
    delegate.initialize(genericSplit, context);
   while( delegate.nextKeyValue())
    {
    String row = delegate.getCurrentValue().toString();
    if(!row.startsWith("#")) throw new IOException("Bad VCF header");
    headerLines.add(row);
    if(row.startsWith("#CHROM")) break;
    }
    }
   @Override
  public LongWritable getCurrentKey() throws IOException,
    InterruptedException {
   return delegate.getCurrentKey();
    }
   
   @Override
  public VcfRow getCurrentValue() throws IOException,
    InterruptedException {
   Text row = this.delegate.getCurrentValue();
   return new VcfRow(headerLines,row.toString());
    }
   
   @Override
  public float getProgress() throws IOException, InterruptedException {
   return this.delegate.getProgress();
    }
   
   @Override
  public boolean nextKeyValue() throws IOException,
    InterruptedException {
   return this.delegate.nextKeyValue();
    }
   
   @Override
  public void close() throws IOException {
    delegate.close();
   }
     }
   }

The hadoop mapper uses the information of each VCFrow and produce a count of each category:
public static class VariantMapper
   extends Mapper<LongWritable, VcfRow, Text, IntWritable>{

 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();

 public void map(LongWritable key, VcfRow vcfRow, Context context ) throws IOException, InterruptedException {
  VariantContext ctx = vcfRow.getVariantContext();
  if( ctx.isIndel())
   { 
   word.set("ctx_indel");
      context.write(word, one);
   }
  if( ctx.isBiallelic())
   { 
   word.set("ctx_biallelic");
      context.write(word, one);
   }
  if( ctx.isSNP())
   { 
   word.set("ctx_snp");
   context.write(word, one);
   } 
  if( ctx.hasID())
   { 
   word.set("ctx_id");
   context.write(word, one);
   } 
  word.set("ctx_total");
  context.write(word, one);
 
  for(String sample: vcfRow.getHeader().getSampleNamesInOrder())
   {
   Genotype g =vcfRow.getVariantContext().getGenotype(sample);
   word.set(sample+" "+ctx.getType()+" "+g.getType().name());
   context.write(word, one);
   }

  }
 }

The Reducer computes the sum of each category:
public static class IntSumReducer
   extends Reducer<Text,IntWritable,Text,IntWritable> {
 private IntWritable result = new IntWritable();

 public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException {
   int sum = 0;
   for (IntWritable val : values) {
  sum += val.get();
    }
   result.set(sum);
   context.write(key, result);
 }
}

and here is the main program:
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "snp count");
    job.setJarByClass(Test.class);
    job.setMapperClass(VariantMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    Path inputPath=new Path(args[0]);
    job.setInputFormatClass(VcfInputFormat.class);
    FileInputFormat.addInputPath(job, inputPath);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

Download, compile, Run:
lindenb@hardyweinberg:~/src/hadoop-sandbox$ make -Bn
rm -rf hadoop-2.7.0
curl -L -o hadoop-2.7.0.tar.gz "http://apache.spinellicreations.com/hadoop/common/hadoop-2.7.0/hadoop-2.7.0.tar.gz"
tar xvfz hadoop-2.7.0.tar.gz
rm hadoop-2.7.0.tar.gz
touch -c hadoop-2.7.0/bin/hadoop
rm -rf htsjdk-1.130
curl -L -o 1.130.tar.gz "https://github.com/samtools/htsjdk/archive/1.130.tar.gz"
tar xvfz 1.130.tar.gz
rm 1.130.tar.gz
(cd htsjdk-1.130 && ant )
mkdir -p tmp dist
javac -d tmp -cp hadoop-2.7.0/share/hadoop/common/hadoop-common-2.7.0.jar:hadoop-2.7.0/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.0.jar:hadoop-2.7.0/share/hadoop/common/lib/hadoop-annotations-2.7.0.jar:hadoop-2.7.0/share/hadoop/common/lib/log4j-1.2.17.jar:htsjdk-1.130/dist/commons-logging-1.1.1.jar:htsjdk-1.130/dist/htsjdk-1.130.jar:htsjdk-1.130/dist/commons-jexl-2.1.1.jar:htsjdk-1.130/dist/snappy-java-1.0.3-rc3.jar -sourcepath src/main/java src/main/java/com/github/lindenb/hadoop/Test.java 
jar cvf dist/test01.jar -C tmp .
rm -rf tmp
mkdir -p input
curl -o input/CEU.exon.2010_09.genotypes.vcf.gz "ftp://ftp-trace.ncbi.nih.gov/1000genomes/ftp/pilot_data/paper_data_sets/a_map_of_human_variation/exon/snps/CEU.exon.2010_09.genotypes.vcf.gz"
gunzip -f input/CEU.exon.2010_09.genotypes.vcf.gz
rm -rf output
HADOOP_CLASSPATH=htsjdk-1.130/dist/commons-logging-1.1.1.jar:htsjdk-1.130/dist/htsjdk-1.130.jar:htsjdk-1.130/dist/commons-jexl-2.1.1.jar:htsjdk-1.130/dist/snappy-java-1.0.3-rc3.jar hadoop-2.7.0/bin/hadoop jar dist/test01.jar com.github.lindenb.hadoop.Test \
   input/CEU.exon.2010_09.genotypes.vcf output
cat output/*

Here is the output of the last command:

15/05/05 17:18:34 INFO input.FileInputFormat: Total input paths to process : 1
15/05/05 17:18:34 INFO mapreduce.JobSubmitter: number of splits:1
15/05/05 17:18:34 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1186897577_0001
15/05/05 17:18:34 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
15/05/05 17:18:34 INFO mapreduce.Job: Running job: job_local1186897577_0001
15/05/05 17:18:34 INFO mapred.LocalJobRunner: OutputCommitter set in config null
15/05/05 17:18:34 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
15/05/05 17:18:34 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
15/05/05 17:18:34 INFO mapred.LocalJobRunner: Waiting for map tasks
15/05/05 17:18:34 INFO mapred.LocalJobRunner: Starting task: attempt_local1186897577_0001_m_000000_0
15/05/05 17:18:34 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
15/05/05 17:18:34 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
15/05/05 17:18:34 INFO mapred.MapTask: Processing split: file:/home/lindenb/src/hadoop-sandbox/input/CEU.exon.2010_09.genotypes.vcf:0+2530564
15/05/05 17:18:34 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
15/05/05 17:18:34 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
15/05/05 17:18:34 INFO mapred.MapTask: soft limit at 83886080
15/05/05 17:18:34 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
15/05/05 17:18:34 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
15/05/05 17:18:34 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
15/05/05 17:18:35 INFO mapreduce.Job: Job job_local1186897577_0001 running in uber mode : false
15/05/05 17:18:35 INFO mapreduce.Job:  map 0% reduce 0%
15/05/05 17:18:36 INFO mapred.LocalJobRunner: 
15/05/05 17:18:36 INFO mapred.MapTask: Starting flush of map output
15/05/05 17:18:36 INFO mapred.MapTask: Spilling map output
15/05/05 17:18:36 INFO mapred.MapTask: bufstart = 0; bufend = 7563699; bufvoid = 104857600
15/05/05 17:18:36 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 24902536(99610144); length = 1311861/6553600
15/05/05 17:18:38 INFO mapred.MapTask: Finished spill 0
15/05/05 17:18:38 INFO mapred.Task: Task:attempt_local1186897577_0001_m_000000_0 is done. And is in the process of committing
(...)
NA12843 SNP HOM_REF 2515
NA12843 SNP HOM_VAR 242
NA12843 SNP NO_CALL 293
NA12872 SNP HET 394
NA12872 SNP HOM_REF 2282
NA12872 SNP HOM_VAR 188
NA12872 SNP NO_CALL 625
NA12873 SNP HET 336
NA12873 SNP HOM_REF 2253
NA12873 SNP HOM_VAR 184
NA12873 SNP NO_CALL 716
NA12874 SNP HET 357
NA12874 SNP HOM_REF 2395
NA12874 SNP HOM_VAR 229
NA12874 SNP NO_CALL 508
NA12878 SNP HET 557
NA12878 SNP HOM_REF 2631
NA12878 SNP HOM_VAR 285
NA12878 SNP NO_CALL 16
NA12889 SNP HET 287
NA12889 SNP HOM_REF 2110
NA12889 SNP HOM_VAR 112
NA12889 SNP NO_CALL 980
NA12890 SNP HET 596
NA12890 SNP HOM_REF 2587
NA12890 SNP HOM_VAR 251
NA12890 SNP NO_CALL 55
NA12891 SNP HET 609
NA12891 SNP HOM_REF 2591
NA12891 SNP HOM_VAR 251
NA12891 SNP NO_CALL 38
NA12892 SNP HET 585
NA12892 SNP HOM_REF 2609
NA12892 SNP HOM_VAR 236
NA12892 SNP NO_CALL 59
ctx_biallelic 3489
ctx_id 3489
ctx_snp 3489
ctx_total 3489


that's it,
Pierre