Tuesday, 27 September 2016

MR Lab9 : Mapper Only Functionalities


Mapper Only functionality.
--------------------------

  row filter:

    ex: select * from emp where sex = 'm';

  for this reducer is not required.

     we need to suspend the reducer..
   ex:
        j.setNumReduceTasks(0);

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class RowFilterMap extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   { // select * from emp where sex ="m";
      String line = v.toString();
      String[] w = line.split(",");
      String sex = w[3];
      if(sex.matches("m"))
         con.write( v , NullWritable.get());
   }
}

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(RowFilterMap.class);
 // j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
 //j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}

[training@localhost ~]$ hadoop fs -cat mrlab/emp
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
101,janaki,10000,f,12
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp mrlab/males

[training@localhost ~]$ hadoop fs -ls mrlab/males
Found 3 items
-rw-r--r--   1 training supergroup          0 2016-09-27 06:57 /user/training/mrlab/males/_SUCCESS
drwxr-xr-x   - training supergroup          0 2016-09-27 06:57 /user/training/mrlab/males/_logs
-rw-r--r--   1 training supergroup         60 2016-09-27 06:57 /user/training/mrlab/males/part-m-00000
[training@localhost ~]$ hadoop fs -cat mrlab/males/part-m-00000
101,vino,26000,m,11
103,mohan,13000,m,13
105,naga,6000,m,13
[training@localhost ~]$

-------------------------------------------
 RowFilter 2:

   on unstructured Text:

[training@localhost ~]$ cat > news
Mr Modi implementing BigData for the Govt data process
Rahul learning BigData           
Pakistan Jurking India
BigData is a hipe or real
[training@localhost ~]$ hadoop fs -copyFromLocal news mrlab
[training@localhost ~]$

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class RowFilter2 extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   { // select * from News where
     //   contains(upcase(line),'BIGDATA');
     
      String line = v.toString().toUpperCase();
     
      if(line.contains("BIGDATA") ||
               line.contains("BIG DATA"))
         con.write( v , NullWritable.get());
   }
}

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(RowFilter2.class);
 // j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
 //j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}

[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/news  mrlab/bignews

[training@localhost ~]$ hadoop fs -cat mrlab/bignews/part-m-00000
Mr Modi implementing BigData for the Govt data process
Rahul learning BigData
BigData is a hipe or real
[training@localhost ~]$

--------------------------------

 Column Filter.

 ex:
     select name, sal, dno  from emp;


package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ColumnFilter extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   { // select name,sal,dno from emp;
      String line = v.toString();
      String[] w = line.split(",");
      String newLine = w[1]+","+
                       w[2]+","+w[4];
      con.write( new Text(newLine) , NullWritable.get());
   }
}

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(ColumnFilter.class);
 // j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
 //j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}

[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp  mrlab/cfilter

[training@localhost ~]$ hadoop fs -cat mrlab/cfilter/part-m-00000
vino,26000,11
Sri,25000,11
mohan,13000,13
lokitha,8000,12
naga,6000,13
janaki,10000,12
[training@localhost ~]$

-----------------------------------------

Generating new Fields

 hive>
   select id, name, sal, sal*0.1 as tax,
            sal*0.2 as hra,
          sal-(sal*0.1)+(sal*0.2) as net,
        sex, dno from emp;


package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class GenerateNewFields extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   {
      String line = v.toString();
      String[] w = line.split(",");
      int sal = Integer.parseInt(w[2]);
      int tax = sal*10/100;
      int hra = sal*20/100;
      int net = sal-tax+hra;
      String newLine =w[0]+","+w[1]+","+sal+","+
                     tax+","+hra+","+net+","+
                      w[3]+","+w[4];
     
     con.write( new Text(newLine) , NullWritable.get());
        
   }
}


package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(GenerateNewFields.class);
 // j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
 //j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}

[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp  mrlab/newFields

[training@localhost ~]$ hadoop fs -cat mrlab/newFields/part-m-00000
101,vino,26000,2600,5200,28600,m,11
102,Sri,25000,2500,5000,27500,f,11
103,mohan,13000,1300,2600,14300,m,13
104,lokitha,8000,800,1600,8800,f,12
105,naga,6000,600,1200,6600,m,13
101,janaki,10000,1000,2000,11000,f,12
[training@localhost ~]$
-----------------------------------
transformations

hive> select id, name, sal ,
    >  if(sal>=70000,'A',
    >     if(sal>=50000,'B',
    >      if(sal>=30000,'C','D'))) as grade,
    > if(sex='m','Male','Female') as sex,
    > if(dno=11,'Marketing',
    >  if(dno=12,'hr',
    >   if(dno=13,'Finance','Other'))) as dname
    > from emp;


package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Transform extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   {
      String line = v.toString();
      String[] w = line.split(",");
      int sal = Integer.parseInt(w[2]);
      String sex = w[3];
      int dno = Integer.parseInt(w[4]);
      String grade;
      if(sal>=70000)
           grade="A";
      else if(sal>=50000)
          grade="B";
      else if(sal>=30000)
          grade="C";
      else grade="D";
     
      if(sex.matches("m"))
          sex="Male";
      else sex="Female";
      String dname;
      switch(dno)
      {
      case 11:
           dname="Marketing";
           break;
      case 12:
           dname="Hr";
           break;
      case 13:
           dname="Finance";
           break;
      default:
           dname="Other";
      }

      String newLine = w[0]+","+
      w[1]+","+sal+","+grade+","+sex+","+dname;
      con.write(new Text(newLine), NullWritable.get());
      }
}


package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(Transform.class);
 // j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
 //j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}


[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp  mrlab/transform


[training@localhost ~]$ hadoop fs -cat mrlab/transform/part-m-00000
101,vino,26000,D,Male,Marketing
102,Sri,25000,D,Female,Marketing
103,mohan,13000,D,Male,Finance
104,lokitha,8000,D,Female,Hr
105,naga,6000,D,Male,Finance
101,janaki,10000,D,Female,Hr
[training@localhost ~]$












































































  


MR Lab 8 : Entire Column Agrregations, Elemination of Duplicates

performing Entire Column aggregation.

ex:
   select sum(sal) from emp;

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class SalMap extends Mapper<LongWritable,Text,Text,IntWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   {
      String line = v.toString();
      int sal = Integer.parseInt(
                  line.split(",")[2]
                                   );
      con.write( new Text("Ibm"), new IntWritable(sal));
   }
}
------------


package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(SalMap.class);
  j.setReducerClass(RedForSum.class);
 //j.setSortComparatorClass(SortComparator.class);
  j.setOutputKeyClass(Text.class);
  j.setOutputValueClass(IntWritable.class);

 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}


[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp mrlab/today1

[training@localhost ~]$ hadoop fs -cat mrlab/today1/part-r-00000
Ibm     88000

--------------------------------------------------

Eleminating duplicate rows ..based on entire row match.

[training@localhost ~]$ cat > profiles
101,aaa
102,bbb
101,aaa
101,aaa
101,aaa
102,bbb
103,ccc
101,xxx
101,aaa
[training@localhost ~]$ hadoop fs -copyFromLocal profiles mrlab
[training@localhost ~]$


package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class NoDupeRowsMap extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   {
      con.write( v , NullWritable.get());
   }
}







---------
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RedForNoDupeRows  extends Reducer<Text,NullWritable,Text,NullWritable>
{
    public void reduce(Text k,Iterable<NullWritable> vlist, Context con)
    throws IOException, InterruptedException
    {
        con.write(k, NullWritable.get());
    }

}

----
package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(NoDupeRowsMap.class);
  j.setReducerClass(RedForNoDupeRows.class);
 //j.setSortComparatorClass(SortComparator.class);
  j.setOutputKeyClass(Text.class);
  j.setOutputValueClass(NullWritable.class);

 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}




[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/profiles mrlab/today3



[training@localhost ~]$ hadoop fs -cat mrlab/today3/part-r-00000
101,aaa
101,xxx
102,bbb
103,ccc

---------------------------

eleminating duplicates based on some column match.



[training@localhost ~]$ hadoop fs -copyFromLocal profiles mrlab/profiles2
[training@localhost ~]$ hadoop fs -cat mrlab/profiles2
101,aaa
102,bbb
101,aaa
101,aaa
101,aaa
102,bbb
103,ccc
101,xxx
101,aaa
101,abc
101,bbc
102,def
[training@localhost ~]$



package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class NoDupeKeysMap extends Mapper<LongWritable,Text,Text,Text>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   {
      String line = v.toString();
      String[] w = line.split(",");
      String id = w[0];
      con.write( new Text(id) , v);
   }
}

------------

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RedForNoDupeKeys  extends Reducer<Text,Text,Text,NullWritable>
{
    public void reduce(Text k,Iterable<Text> vlist, Context con)
    throws IOException, InterruptedException
    {
      for(Text line : vlist)
      {
          con.write(line,NullWritable.get());
          break;
      }
    }

}

----------

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(NoDupeKeysMap.class);
  j.setReducerClass(RedForNoDupeKeys.class);
 //j.setSortComparatorClass(SortComparator.class);
  j.setOutputKeyClass(Text.class);
  j.setOutputValueClass(Text.class);

 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}





[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/profiles2  mrlab/today4


[training@localhost ~]$ hadoop fs -cat mrlab/today4/part-r-00000
101,aaa
102,bbb
103,ccc

------------
to get Last duplicate, do following change in reducer.

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RedForNoDupeKeys  extends Reducer<Text,Text,Text,NullWritable>
{
    public void reduce(Text k,Iterable<Text> vlist, Context con)
    throws IOException, InterruptedException
    {
        String line="";
      for(Text ln : vlist)
      {
          line = ln.toString();
 
      }
      con.write(new Text(line), NullWritable.get());
     
    }

}
--------


[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/profiles2  mrlab/today5


[training@localhost ~]$ hadoop fs -cat mrlab/today5/part-r-00000
101,bbc
102,def
103,ccc

-------------------------------















Thursday, 22 September 2016

MR Lab7 : Joins Using MapReduce



[training@localhost ~]$ cat > dept
11,marketing,hyd
12,hr,del
13,finance,hyd
[training@localhost ~]$ cat emp
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
101,janaki,10000,f,12
[training@localhost ~]$ hadoop fs -mkdir joins
[training@localhost ~]$ hadoop fs -copyFromLocal dept emp joins
[training@localhost ~]$ hadoop fs -ls joins
Found 2 items
-rw-r--r--   1 training supergroup         42 2016-09-22 06:47

/user/training/joins/dept
-rw-r--r--   1 training supergroup        123 2016-09-22 06:47

/user/training/joins/emp
[training@localhost ~]$

--------------
JoinMapper.java
------------------
package mr.analytics;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class JoinMapper extends

Mapper<LongWritable,Text,Text,NullWritable>
{
    HashMap<Integer,String> hm = new HashMap<Integer,String>();
    public void setup(Context con)
     throws IOException, InterruptedException ,

FileNotFoundException
     {
  Path[] p = DistributedCache.getLocalCacheFiles
                         (con.getConfiguration());
   FileInputStream fs = new FileInputStream(p[0].toString());
   InputStreamReader isr = new InputStreamReader(fs);
   BufferedReader br = new BufferedReader(isr);
   String line="";
   while((line =br.readLine())!=null)
   {
       // 11,mrketing,hyd
       String[] w = line.split(",");
       int dno = Integer.parseInt(w[0]);
       String dinfo = w[1]+","+w[2];
       hm.put(dno, dinfo);
   }
   br.close();
  }
 public void map(LongWritable k, Text v, Context con)
  throws IOException, InterruptedException
  {
     // 101,aaa,20000,m,11
     String line = v.toString();
     String[] w = line.split(",");
     int dno = Integer.parseInt(w[4]);
     String dinfo = hm.get(dno);
     String info = line+","+dinfo;
     con.write(new Text(info), NullWritable.get());
  }
}

--------------
JoinDriver.java
----------------

package mr.analytics;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class JoinDriver
{
    public static void main(String[] args)
     throws Exception
     {
         Configuration c = new Configuration();
         Job j = new Job(c, "Merging");
         j.setJarByClass(MergedSum.class);
         j.setMapperClass(JoinMapper.class);
         j.setOutputKeyClass(Text.class);
         j.setOutputValueClass(NullWritable.class);
       
         Path p1 = new Path(args[0]); // emp
         Path p2 = new Path(args[1]); // dept
         Path p3 = new Path(args[2]); // output
       
FileInputFormat.addInputPath(j, p1);
DistributedCache.addCacheFile(new URI(p2.toString()),
           j.getConfiguration());
FileOutputFormat.setOutputPath(j, p3);       

System.exit(j.waitForCompletion(true) ? 0:1);
     }

}

-----------------------

[training@localhost ~]$ hadoop jar Desktop/myapp.jar 

mr.analytics.JoinDriver    joins/emp joins/dept  joins/result

[training@localhost ~]$ hadoop fs -ls joins
Found 3 items
-rw-r--r--   1 training supergroup         42 2016-09-22 06:47

/user/training/joins/dept
-rw-r--r--   1 training supergroup        123 2016-09-22 06:47

/user/training/joins/emp
drwxr-xr-x   - training supergroup          0 2016-09-22 06:56

/user/training/joins/result
[training@localhost ~]$ hadoop fs -ls joins/result
Found 3 items
-rw-r--r--   1 training supergroup          0 2016-09-22 06:56

/user/training/joins/result/_SUCCESS
drwxr-xr-x   - training supergroup          0 2016-09-22 06:56

/user/training/joins/result/_logs
-rw-r--r--   1 training supergroup        189 2016-09-22 06:56

/user/training/joins/result/part-r-00000
[training@localhost ~]$ hadoop fs -cat joins/result/part-r-00000
101,janaki,10000,f,12,hr,del
101,vino,26000,m,11,marketing,hyd
102,Sri,25000,f,11,marketing,hyd
103,mohan,13000,m,13,finance,hyd
104,lokitha,8000,f,12,hr,del
105,naga,6000,m,13,finance,hyd
[training@localhost ~]$

-------------------------------------


MR Lab6 : Sorting Using MapReduce

hive> select * from emp
        order by sal desc;

emp---->id, name, sal, sex,dno

SortSalDriver.java
--------------------------
package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortSalDriver
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "SortOnValueDescending");
  j.setJarByClass(SortSalDriver.class);

  j.setMapperClass(SortSalMap.class);
  j.setReducerClass(SortSalRed.class);
  j.setSortComparatorClass(SortComparator.class);
  j.setOutputKeyClass(IntWritable.class);
  j.setOutputValueClass(Text.class);

 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}
----
SortSalMap.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SortSalMap extends
Mapper<LongWritable,Text,IntWritable,Text>
{
    public void map(LongWritable k,
             Text v , Context con)
     throws IOException, InterruptedException
     {
         //   v  --> 101,aaa,40000,m,11
        String line = v.toString();
        String[] w = line.split(",");
        int sal = Integer.parseInt(w[2]);
        con.write(new IntWritable(sal),v);
     }
}


---------

SortSalRed.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SortSalRed extends Reducer<IntWritable,
 Text,Text,NullWritable>
{
    public void reduce(IntWritable sal, Iterable<Text> vlist,
            Context con)
    throws IOException, InterruptedException
    {
        for(Text rec: vlist)
            con.write(rec,NullWritable.get());
    }

}

---------------
SortComparator.java

package mr.analytics;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class SortComparator extends WritableComparator {

 protected SortComparator() {
  super(IntWritable.class, true);
  // TODO Auto-generated constructor stub
 }

 @Override
 public int compare(WritableComparable o1, WritableComparable o2) {
  IntWritable k1 = (IntWritable) o1;
  IntWritable k2 = (IntWritable) o2;
  int cmp = k1.compareTo(k2);
  return -1* cmp;
 }


}
---------------------------------------------------
hive> select sex, sum(sal) as tot from emp
         group by sex
       order by tot desc;



package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortDriver2
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "SortOnValueDescending");
  j.setJarByClass(SortDriver2.class);

  j.setMapperClass(Map1.class);
  j.setReducerClass(RedForSum.class);
  //j.setSortComparatorClass(SortComparator.class);
  j.setMapOutputKeyClass(Text.class);
  j.setMapOutputValueClass(IntWritable.class);
 
  j.setOutputKeyClass(Text.class);
  j.setOutputValueClass(IntWritable.class);

  //   file1   res1   res2
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  j.waitForCompletion(true);
 
  Job j2 = new Job(c, "SortOnValueDescending");
  j2.setJarByClass(SortDriver2.class);

  j2.setMapperClass(SortMapper.class);
  j2.setReducerClass(SortReducer.class);
  j2.setSortComparatorClass(SortComparator.class);
  j2.setOutputKeyClass(IntWritable.class);
  j2.setOutputValueClass(Text.class);

 
  FileInputFormat.addInputPath(j2, new Path(args[1]));
  FileOutputFormat.setOutputPath(j2, new Path(args[2]));
 
  System.exit(j2.waitForCompletion(true) ? 0:1);
 }

}
-------------

Map1.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// emp1, emp3
// id,name,sal,sex,dno
public class Map1 extends
 Mapper<LongWritable,Text,Text,IntWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
     String line = v.toString();
     String[] w = line.split(",");
     String sex = w[3];
     int sal = Integer.parseInt(w[2]);
   
    con.write(new Text(sex), new IntWritable(sal));
 }
}
-----------
RedForSum.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class RedForSum extends Reducer<Text,IntWritable,
 Text,IntWritable>
{
    public void reduce(Text k, Iterable<IntWritable>vlist, Context con)
     throws IOException, InterruptedException
     {
        int tot = 0;
         for (IntWritable v:vlist)
             tot+=v.get();
         con.write(k, new IntWritable(tot));
     }

}


-------------
SortMapper.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SortMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
 public void map(LongWritable key, Text value, Context context)
   throws IOException, InterruptedException {
  //   sex     sal
   
  String[] splits = value.toString().trim().split("\t");
  int tot = Integer.parseInt(splits[1]);
  context.write(new IntWritable(tot),
          new Text(splits[0]));
 }

}
------------------

SortReducer.java

package mr.analytics;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SortReducer extends
  Reducer<IntWritable, Text, Text, IntWritable> {

 @Override
 public void reduce(IntWritable key, Iterable<Text> values, Context context)
   throws IOException, InterruptedException {
            
   
  for (Text val : values) {
   context.write(val, key);
  }
 }
}

------------------------




Thursday, 8 September 2016

MR Lab5: Merging, MergedAggregations

[training@localhost ~]$ cat emp
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
101,janaki,10000,f,12
[training@localhost ~]$

[training@localhost ~]$ cat > emp2
201,aaa,11,m,90000
202,bbbbb,12,f,100000
203,ccc,13,m,200000
[training@localhost ~]$ cat > emp3
301,iiiii,1000,m,11
302,uuuuu,10000,m,12
303,jjjjjj,20000,f,13
[training@localhost ~]$ hadoop fs -mkdir mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp2 mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp3 mrlab
[training@localhost ~]$ hadoop fs -ls mrlab
Found 3 items
-rw-r--r--   1 training supergroup        123 2016-09-07 20:13 /user/training/mrlab/emp
-rw-r--r--   1 training supergroup         61 2016-09-07 20:13 /user/training/mrlab/emp2
-rw-r--r--   1 training supergroup         63 2016-09-07 20:13 /user/training/mrlab/emp3
[training@localhost ~]$

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

// emp1, emp3
// id,name,sal,sex,dno
public class MergeMap1 extends
 Mapper<LongWritable,Text,Text,NullWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
    con.write(v, NullWritable.get());
 }
}

-----------------
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

// emp2
// id,name,dno,sex,sal
public class MergeMap2 extends
 Mapper<LongWritable,Text,Text,NullWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
     String line = v.toString();
     String[] w = line.split(",");
     String newLine = w[0]+","+
     w[1]+","+w[4]+","+w[3]+","+w[2];
    con.write(new Text(newLine), NullWritable.get());
 }
}


------------
package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Merge
{
    public static void main(String[] args)
     throws Exception
     {
         Configuration c = new Configuration();
         Job j = new Job(c, "Merging");
         j.setJarByClass(Merge.class);
         j.setNumReduceTasks(1);
         j.setOutputKeyClass(Text.class);
         j.setOutputValueClass(NullWritable.class);
       
         Path p1 = new Path(args[0]); // emp
         Path p2 = new Path(args[1]); // emp2
         Path p3 = new Path(args[2]); // emp3
         Path p4 = new Path(args[3]); //output
MultipleInputs.addInputPath(j,p1,TextInputFormat.class,MergeMap1.class);
MultipleInputs.addInputPath(j,p2,TextInputFormat.class,MergeMap2.class);
MultipleInputs.addInputPath(j,p3,TextInputFormat.class,MergeMap1.class);

FileOutputFormat.setOutputPath(j, p4);       

System.exit(j.waitForCompletion(true) ? 0:1);
     }

}


------------
[training@localhost ~]$ hadoop jar Desktop/myapp.jar  mr.analytics.Merge  mrlab/emp mrlab/emp2   mrlab/emp3  mrlab/content

[training@localhost ~]$ hadoop fs -cat mrlab/content/part-r-00000
101,janaki,10000,f,12
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
201,aaa,90000,m,11
202,bbbbb,100000,f,12
203,ccc,200000,m,13
301,iiiii,1000,m,11
302,uuuuu,10000,m,12
303,jjjjjj,20000,f,13

--------------------------------

hql:

  select sex, sum(sal) from (
     select sex, sal from emp1
         union all
     select sex, sal from emp2
         union all
 select sex, sal from emp3 )
         e  group by sex;
   
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// emp1, emp3
// id,name,sal,sex,dno
public class Map1 extends
 Mapper<LongWritable,Text,Text,IntWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
     String line = v.toString();
     String[] w = line.split(",");
     String sex = w[3];
     int sal = Integer.parseInt(w[2]);
   
    con.write(new Text(sex), new IntWritable(sal));
 }
}

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// emp2
// id,name,dno,sex,sal
public class Map2 extends
 Mapper<LongWritable,Text,Text,IntWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
     String line = v.toString();
     String[] w = line.split(",");
     String sex = w[3];
     int sal = Integer.parseInt(w[4]);
   
    con.write(new Text(sex), new IntWritable(sal));
 }
}

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class RedForSum extends Reducer<Text,IntWritable,
 Text,IntWritable>
{
    public void reduce(Text k, Iterable<IntWritable>vlist, Context con)
     throws IOException, InterruptedException
     {
        int tot = 0;
         for (IntWritable v:vlist)
             tot+=v.get();
         con.write(k, new IntWritable(tot));
     }

}

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MergedSum
{
    public static void main(String[] args)
     throws Exception
     {
         Configuration c = new Configuration();
         Job j = new Job(c, "Merging");
         j.setJarByClass(MergedSum.class);
         j.setReducerClass(RedForSum.class);
         j.setOutputKeyClass(Text.class);
         j.setOutputValueClass(IntWritable.class);
       
         Path p1 = new Path(args[0]); // emp
         Path p2 = new Path(args[1]); // emp2
         Path p3 = new Path(args[2]); // emp3
         Path p4 = new Path(args[3]); //output
MultipleInputs.addInputPath(j,p1,TextInputFormat.class,Map1.class);
MultipleInputs.addInputPath(j,p2,TextInputFormat.class,Map2.class);
MultipleInputs.addInputPath(j,p3,TextInputFormat.class,Map1.class);

FileOutputFormat.setOutputPath(j, p4);       

System.exit(j.waitForCompletion(true) ? 0:1);
     }

}




  
[training@localhost ~]$ hadoop jar Desktop/myapp.jar  mr.analytics.MergedSum  mrlab/emp mrlab/emp2   mrlab/emp3  mrlab/res1


[training@localhost ~]$ hadoop fs -cat mrlab/res1/part-r-00000
f       163000
m       346000

--------------------------------









Saturday, 3 September 2016

Hive : Hive Indexes

sreeram-hadoop.blogspot.in
sankara.deva2016@gmail.com
------------------------------------------------------------------
hive> create table sales(cid int, pid string, amt int)
     row format delimited
       fields terminated by ',';
OK
Time taken: 11.849 seconds
hive> load data local inpath 'sales' into table sales;
Loading data to table default.sales
Table default.sales stats: [numFiles=1, totalSize=192]
OK
Time taken: 1.142 seconds
hive> select * from sales;
OK
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
Time taken: 0.508 seconds, Fetched: 16 row(s)
hive>


hive> create INDEX cid_index on TABLE
      sales(cid)
     as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
     WITH deferred rebuild;
OK
Time taken: 0.351 seconds
hive>

hive> ALTER INDEX  cid_index  ON sales REBUILD;

hive> show tables;
OK
damp
default__sales_cid_index__
mamp
mytab
ramp
sales

hive> describe default__sales_cid_index__;
OK
cid                     int                                        
_bucketname             string                                     
_offsets                array<bigint>                             


hive> select * from default__sales_cid_index__;
OK
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [0,36,48,60,72,168,180]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [12,84,96,108,156]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [24,120,132,144]

[cloudera@quickstart ~]$ cat sales > sales001


hive> load data local inpath 'sales001' into table sales;
Loading data to table default.sales
Table default.sales stats: [numFiles=2, totalSize=384]
OK
Time taken: 0.248 seconds
hive> select * from sales;
OK
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
Time taken: 0.073 seconds, Fetched: 32 row(s)
hive>

hive> ALTER INDEX  cid_index  ON sales REBUILD;


hive> select * from default__sales_cid_index__;
OK
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [0,36,48,60,72,168,180]
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [0,36,48,60,72,168,180]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [12,84,96,108,156]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [12,84,96,108,156]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [24,120,132,144]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [24,120,132,144]
Time taken: 0.086 seconds, Fetched: 6 row(s)
hive>

[cloudera@quickstart ~]$ cat sales002
101,p1,1000
101,p1,1200
101,p3,5000
101,p4,6000
101,p7,9000
105,p1,9000
105,p4,10000
105,p6,9000
[cloudera@quickstart ~]$

hive> load data local inpath 'sales002' into table sales;
Loading data to table default.sales
Table default.sales stats: [numFiles=3, totalSize=481]
OK
Time taken: 0.233 seconds
hive> select * from sales;
OK
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
101    p1    1000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
105    p1    9000
105    p4    10000
105    p6    9000
Time taken: 0.085 seconds, Fetched: 40 row(s)
hive>

-- in above table , 105  is available only in bucket3(sales002)

hive> select * from default__sales_cid_index__;
OK
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [0,36,48,60,72,168,180]
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [0,36,48,60,72,168,180]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [12,84,96,108,156]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [12,84,96,108,156]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [24,120,132,144]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [24,120,132,144]
Time taken: 0.074 seconds, Fetched: 6 row(s)
hive>

-- in above output no information about 3rd bucket.
 -- bcoz, index is not rebuild.


hive> ALTER INDEX  cid_index  ON sales REBUILD;

hive> select * from default__sales_cid_index__;
OK
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [0,36,48,60,72,168,180]
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [0,36,48,60,72,168,180]
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales002    [0,12,24,36,48]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [12,84,96,108,156]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [12,84,96,108,156]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [24,120,132,144]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [24,120,132,144]
105    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales002    [60,72,85]
Time taken: 0.081 seconds, Fetched: 8 row(s)
hive>

-- after rebuild index, bucket3(sales002) information available.


hive> select * from sales where cid=105;

-- now it reads only bucket3(sales002).

-----------------------------------------------------------------
sreeram-hadoop.blogspot.in
sankara.deva2016@gmail.com
----------------------------------------------------------------

Hive Bucketing tables and Indexes.
-----------------------------------------------------------------

hive> create table bucks_sales(cid int, pid string,
       amt int)
    > clustered by (pid)
    > into 4 buckets
    > row format delimited
    >   fields terminated by ',';
OK
Time taken: 0.077 seconds
hive>

hive> set hive.enforce.bucketing=true;
hive> insert overwrite table bucks_sales
    >   select * from sales;

[cloudera@quickstart ~]$ hadoop fs -ls /user/hive/warehouse/bucks_sales
Found 4 items
-rwxrwxrwx   1 cloudera supergroup         73 2016-09-02 11:11 /user/hive/warehouse/bucks_sales/000000_0
-rwxrwxrwx   1 cloudera supergroup        204 2016-09-02 11:11 /user/hive/warehouse/bucks_sales/000001_0
-rwxrwxrwx   1 cloudera supergroup         84 2016-09-02 11:11 /user/hive/warehouse/bucks_sales/000002_0
-rwxrwxrwx   1 cloudera supergroup        120 2016-09-02 11:11 /user/hive/warehouse/bucks_sales/000003_0
[cloudera@quickstart ~]$

-- now data(all rows) divided into 4 buckets.

[cloudera@quickstart ~]$ hadoop fs -cat /user/hive/warehouse/bucks_sales/000000_0
105,p4,10000
101,p4,6000
101,p4,6000
101,p4,6000
102,p4,6000
102,p4,6000
[cloudera@quickstart ~]$ hadoop fs -cat /user/hive/warehouse/bucks_sales/000001_0
101,p1,1000
105,p1,9000
101,p1,1200
101,p1,1000
101,p1,5000
103,p1,4000
103,p1,2000
103,p1,1000
101,p1,1200
102,p1,2000
101,p1,1000
101,p1,5000
103,p1,4000
103,p1,2000
103,p1,1000
101,p1,1200
102,p1,2000
[cloudera@quickstart ~]$ hadoop fs -cat /user/hive/warehouse/bucks_sales/000002_0
102,p2,4000
101,p2,3000
102,p2,4000
105,p6,9000
102,p2,4000
102,p2,4000
101,p2,3000
[cloudera@quickstart ~]$ hadoop fs -cat /user/hive/warehouse/bucks_sales/000003_0
101,p3,5000
102,p3,5000
102,p3,5000
103,p3,4000
101,p7,9000
101,p7,9000
101,p7,9000
101,p3,5000
101,p3,5000
103,p3,4000
[cloudera@quickstart ~]$


-- in above output,
 all p4 s available in bucket1 (000000_0)

 all p1 s available in bucket2 (000001_0)

 all p2 and p6 available in bucket3 (000002_0)

 all p3 and p7 available in bucket4 (000003_0)


hive> select * from bucks_sales where pid='p3';
-- to read p3 rows, hive will read all buckets of the table.
 -- bcoz, hive does not know in which bucket 'p3' s available.

thats why,
  lets create index object on bucks_sales table on column pid.

hive> create index pid_index on table bucks_sales(pid)
     as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
     with deferred rebuild;

hive> show tables;
OK
bucks_sales
damp
default__bucks_sales_pid_index__
default__sales_cid_index__
mamp
mytab
ramp
sales
Time taken: 0.032 seconds, Fetched: 8 row(s)
hive> select * from default__bucks_sales_pid_index__;
OK
Time taken: 0.089 seconds
hive>

-- now index table is empty. bcoz, index is not rebuild(altered).


hive> ALTER INDEX  pid_index  ON bucks_sales REBUILD;

hive> select * from default__bucks_sales_pid_index__;
OK
p1    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000001_0    [0,12,24,36,48,60,72,84,96,108,120,132,144,156,168,180,192]
p2    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000002_0    [0,12,24,48,60,72]
p3    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000003_0    [0,12,24,36,84,96,108]
p4    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000000_0    [0,13,25,37,49,61]
p6    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000002_0    [36]
p7    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000003_0    [48,60,72]
Time taken: 0.072 seconds, Fetched: 6 row(s)
hive>

hive> select * from bucks_sales where pid='p3';
OK
101    p3    5000
102    p3    5000
102    p3    5000
103    p3    4000
101    p3    5000
101    p3    5000
103    p3    4000
Time taken: 0.078 seconds, Fetched: 7 row(s)
hive>

-- when you ask 'p3' data hive will read only from  4th bucket    (000003_0)

------------------------------------------------------------------

sreeram-hadoop.blogspot.in
sankara.deva2016@gmail.com
----------------------------------------------------------------




















Tuesday, 30 August 2016

MR Lab3

grouping By Multiple columns.

ex:

 select dno, sex, sum(sal) from emp
   group by dno, sex;


DnoSexSalMap.java
--------------------
package mr.analytics;

import java.io.IOException;

import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public class DnoSexSalMap extends

Mapper
 <LongWritable,Text,Text,IntWritable>
{
     //  file : emp
     // schema : id,name,sal,sex,dno
    //  delimiter : "," (comma)
 //  sample row : 101,amar,20000,m,11
 //   sex as key, sal as value.
    public void map(LongWritable 
k,Text v,
            Context  con)
     throws IOException,
InterruptedException
     {
        String line =
v.toString();
      String[] w = line.split(",");   
      String sex = w[3];
      String dno = w[4];
      String myKey = dno+"\t"+sex;
     int sal =Integer.parseInt(w[2]);
    con.write(new Text(myKey),new
IntWritable(sal));
     }
  }

----------------
Driver8.java
----------------

package mr.analytics;

import

org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import

org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import

org.apache.hadoop.mapreduce.lib.input.F

ileInputFormat;
import

org.apache.hadoop.mapreduce.lib.output.

FileOutputFormat;

public class Driver8
{
    public static void main(String

[] args)
     throws Exception
     {
        Configuration c = new

Configuration();
        Job j = new Job

(c,"d8");
        j.setJarByClass

(Driver8.class);
        j.setMapperClass

(DnoSexSalMap.class);
        j.setReducerClass

(RedForSum.class);
        j.setOutputKeyClass

(Text.class);
        j.setOutputValueClass

(IntWritable.class);
         Path p1 = new Path

(args[0]); //input
         Path p2 = new Path

(args[1]); //output
       
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ?

0:1);
  }
}

--------------------------

submit:

[training@localhost ~]$ hadoop fs -cat

mrlab/r8/part-r-00000
11      f       25000
11      m       26000
12      f       18000
13      m       19000

______________________________