Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Tuesday 30 August 2016

MR Lab3

grouping By Multiple columns.

ex:

 select dno, sex, sum(sal) from emp
   group by dno, sex;


DnoSexSalMap.java
--------------------
package mr.analytics;

import java.io.IOException;

import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public class DnoSexSalMap extends

Mapper
 <LongWritable,Text,Text,IntWritable>
{
     //  file : emp
     // schema : id,name,sal,sex,dno
    //  delimiter : "," (comma)
 //  sample row : 101,amar,20000,m,11
 //   sex as key, sal as value.
    public void map(LongWritable 
k,Text v,
            Context  con)
     throws IOException,
InterruptedException
     {
        String line =
v.toString();
      String[] w = line.split(",");   
      String sex = w[3];
      String dno = w[4];
      String myKey = dno+"\t"+sex;
     int sal =Integer.parseInt(w[2]);
    con.write(new Text(myKey),new
IntWritable(sal));
     }
  }

----------------
Driver8.java
----------------

package mr.analytics;

import

org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import

org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import

org.apache.hadoop.mapreduce.lib.input.F

ileInputFormat;
import

org.apache.hadoop.mapreduce.lib.output.

FileOutputFormat;

public class Driver8
{
    public static void main(String

[] args)
     throws Exception
     {
        Configuration c = new

Configuration();
        Job j = new Job

(c,"d8");
        j.setJarByClass

(Driver8.class);
        j.setMapperClass

(DnoSexSalMap.class);
        j.setReducerClass

(RedForSum.class);
        j.setOutputKeyClass

(Text.class);
        j.setOutputValueClass

(IntWritable.class);
         Path p1 = new Path

(args[0]); //input
         Path p2 = new Path

(args[1]); //output
       
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ?

0:1);
  }
}

--------------------------

submit:

[training@localhost ~]$ hadoop fs -cat

mrlab/r8/part-r-00000
11      f       25000
11      m       26000
12      f       18000
13      m       19000

______________________________



MR Lab2

[training@localhost ~]$ ls emp
emp
[training@localhost ~]$ cat emp
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
101,janaki,10000,f,12
[training@localhost ~]$ hadoop fs -copyFromLocal emp mrlab

task:-
   for each sex group total salary.

hql :
  select sex, sum(sal) from emp
    group by sex;

SexSalMap.java
__________________

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.util.StringTokenizer;
public class SexSalMap extends Mapper
 <LongWritable,Text,Text,IntWritable>
{
     //  file : emp
     // schema : id,name,sal,sex,dno
    //  delimiter : "," (comma)
 //  sample row : 101,amar,20000,m,11
 //   sex as key, sal as value.
    public void map(LongWritable  k,Text v,
            Context  con)
     throws IOException, InterruptedException
     {
        String line = v.toString();
      String[] w = line.split(",");   
      String sex = w[3];
     int sal =Integer.parseInt(w[2]);
    con.write(new Text(sex),new IntWritable(sal));
     }
  }
}

Reducer for Sum.

RedForSum.java
_________________

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RedForSum extends Reducer
<Text,IntWritable,Text,IntWritable>
{
    //   i   <1,1,1>   
   public void reduce(Text k,Iterable<IntWritable> vlist,
            Context con)
   throws IOException, InterruptedException
   {
       int tot=0;
       for(IntWritable v: vlist)
          tot+=v.get();
       con.write(k, new IntWritable(tot));
   }
}

Driver2.java
________________
package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driver2
{
    public static void main(String[] args)
     throws Exception
     {
        Configuration c = new Configuration();
        Job j = new Job(c,"d2");
        j.setJarByClass(Driver2.class);
        j.setMapperClass(SexSalMap.class);
        j.setReducerClass(RedForSum.class);
        j.setOutputKeyClass(Text.class);
        j.setOutputValueClass(IntWritable.class);
         Path p1 = new Path(args[0]); //input
         Path p2 = new Path(args[1]); //output
       
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ? 0:1);
  }
}

________________________________

export to into Desktop/myapp.jar

------------------------------

submitting job:

[training@localhost ~]$ hadoop jar \
>  Desktop/myapp.jar \
>  mr.analytics.Driver2 \
>  mrlab/emp \
>  mrlab/r1

[training@localhost ~]$ hadoop fs -ls mrlab/r1
Found 3 items
-rw-r--r--   1 training supergroup          0 2016-08-30 20:24 /user/training/mrlab/r1/_SUCCESS
drwxr-xr-x   - training supergroup          0 2016-08-30 20:23 /user/training/mrlab/r1/_logs
-rw-r--r--   1 training supergroup         16 2016-08-30 20:24 /user/training/mrlab/r1/part-r-00000
[training@localhost ~]$ hadoop fs -cat mrlab/r1/part-r-00000
f       43000
m       45000
[training@localhost ~]$

______________________________________
Task2:
   Mapper for dno as key, sal value.

DnoSalMap.java
__________________________

 package mr.analytics;

import java.io.IOException;

import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public class DnoSalMap extends Mapper
 <LongWritable,Text,Text,IntWritable>
{
     //  file : emp
     // schema : id,name,sal,sex,dno
    //  delimiter : "," (comma)
 //  sample row : 101,amar,20000,m,11
 //   dno as key, sal as value.
    public void map(LongWritable 
k,Text v,
            Context  con)
     throws IOException,
InterruptedException
     {
        String line =
v.toString();
      String[] w = line.split(",");   
      String dno = w[4];
     int sal =Integer.parseInt(w[2]);
    con.write(new Text(dno),new
IntWritable(sal));
     }
  }
________________________
aggregation: Sum.
  already we have RedForSum
_______________________
Driver3.java


package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driver3
{
    public static void main(String[] args)
     throws Exception
     {
        Configuration c = new Configuration();
        Job j = new Job(c,"d3");
        j.setJarByClass(Driver3.class);
        j.setMapperClass(DnoSalMap.class);
        j.setReducerClass(RedForSum.class);
        j.setOutputKeyClass(Text.class);
        j.setOutputValueClass(IntWritable.class);
         Path p1 = new Path(args[0]); //input
         Path p2 = new Path(args[1]); //output
       
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ? 0:1);
  }
}

---------------------
[training@localhost ~]$ hadoop jar  Desktop/myapp.jar  mr.analytics.Driver3  mrlab/emp  mrlab/r2

[training@localhost ~]$ hadoop fs -cat mrlab/r2/part-r-00000
11      51000
12      18000
13      19000
[training@localhost ~]$

______________________________

Task4:

hql:
  select sex, avg(sal) from emp
   group by sex;

mapper--> SexSalMap  (existed)
Reducer --> RedForAvg
Driver4.


RedForAvg.java
---------------
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RedForAvg extends Reducer
<Text,IntWritable,Text,IntWritable>
{
    //   i   <1,1,1>   
   public void reduce(Text k,Iterable<IntWritable> vlist,
            Context con)
   throws IOException, InterruptedException
   {
       int tot=0;
       int cnt=0;
       for(IntWritable v: vlist)
       {
          tot+=v.get();
          cnt++;
       }
       int avg = tot/cnt;
       con.write(k, new IntWritable(avg));
   }
}

Driver4.java
-------------
package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driver4
{
    public static void main(String[] args)
     throws Exception
     {
        Configuration c = new Configuration();
        Job j = new Job(c,"d4");
        j.setJarByClass(Driver4.class);
        j.setMapperClass(SexSalMap.class);
        j.setReducerClass(RedForAvg.class);
        j.setOutputKeyClass(Text.class);
        j.setOutputValueClass(IntWritable.class);
         Path p1 = new Path(args[0]); //input
         Path p2 = new Path(args[1]); //output
       
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ? 0:1);
  }
}

---------------------------

submit:

[training@localhost ~]$ hadoop jar  Desktop/myapp.jar  mr.analytics.Driver4  mrlab/emp  mrlab/r3



[training@localhost ~]$ hadoop fs -cat mrlab/r3/part-r-00000
f       14333
m       15000

_____________________________

Task5:

  select sex,  count(*) from emp
    group by sex;

Mapper:   SexSalMap
Reducer:  RedForCnt


RedForCnt.java
-----------------------
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RedForCnt extends Reducer
<Text,IntWritable,Text,IntWritable>
{
    //   i   <1,1,1>   
   public void reduce(Text k,Iterable<IntWritable> vlist,
            Context con)
   throws IOException, InterruptedException
   {
       int tot=0;
       for(IntWritable v: vlist)
          tot++;
       con.write(k, new IntWritable(tot));
   }
}

Driver5.java
------------------------
package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driver5
{
    public static void main(String[] args)
     throws Exception
     {
        Configuration c = new Configuration();
        Job j = new Job(c,"d5");
        j.setJarByClass(Driver5.class);
        j.setMapperClass(SexSalMap.class);
        j.setReducerClass(RedForCnt.class);
        j.setOutputKeyClass(Text.class);
        j.setOutputValueClass(IntWritable.class);
         Path p1 = new Path(args[0]); //input
         Path p2 = new Path(args[1]); //output
       
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ? 0:1);
  }
}

-----------------------------

submit:

[training@localhost ~]$ hadoop jar  Desktop/myapp.jar  mr.analytics.Driver5  mrlab/emp  mrlab/r5

[training@localhost ~]$ hadoop fs -cat mrlab/r5/part-r-00000
f       3
m       3

____________________________________

Task5:
 
hql:
  select sex, max(sal) from emp
   group by sex;

mapper:  SexSalMap
Reducer: RedForMax

RedForMax.java
------------------------
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RedForMax extends Reducer
<Text,IntWritable,Text,IntWritable>
{
    //   i   <1,1,1>   
   public void reduce(Text k,Iterable<IntWritable> vlist,
            Context con)
   throws IOException, InterruptedException
   {
       int m=0;
       int n =0;
       for(IntWritable v: vlist)
       {
           n++;
           if(n==1) m=v.get();
           m = Math.max(m, v.get());
       }
         
       con.write(k, new IntWritable(m));
   }
}
-----------------------
Driver6.java
------------

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driver6
{
    public static void main(String[] args)
     throws Exception
     {
        Configuration c = new Configuration();
        Job j = new Job(c,"d6");
        j.setJarByClass(Driver6.class);
        j.setMapperClass(SexSalMap.class);
        j.setReducerClass(RedForMax.class);
        j.setOutputKeyClass(Text.class);
        j.setOutputValueClass(IntWritable.class);
         Path p1 = new Path(args[0]); //input
         Path p2 = new Path(args[1]); //output
       
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ? 0:1);
  }
}

---------------------
submit

[training@localhost ~]$ hadoop jar  Desktop/myapp.jar  mr.analytics.Driver6  mrlab/emp  mrlab/r6




[training@localhost ~]$ hadoop fs -cat mrlab/r6/part-r-00000
f       25000
m       26000

---------------------------------

Task7:

hql:
  select sex, min(sal) from emp
   group by sex;

Mapper:  SexSalMap
Reducer:  RedForMin

RedForMin.java
---------------------------
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RedForMin extends Reducer
<Text,IntWritable,Text,IntWritable>
{
    //   i   <1,1,1>   
   public void reduce(Text k,Iterable<IntWritable> vlist,
            Context con)
   throws IOException, InterruptedException
   {
       int m=0;
       int n =0;
       for(IntWritable v: vlist)
       {
           n++;
           if(n==1) m=v.get();
           m = Math.min(m, v.get());
       }
         
       con.write(k, new IntWritable(m));
   }
}

---------------------

Driver7.java
---------------

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driver7
{
    public static void main(String[] args)
     throws Exception
     {
        Configuration c = new Configuration();
        Job j = new Job(c,"d7");
        j.setJarByClass(Driver7.class);
        j.setMapperClass(SexSalMap.class);
        j.setReducerClass(RedForMin.class);
        j.setOutputKeyClass(Text.class);
        j.setOutputValueClass(IntWritable.class);
         Path p1 = new Path(args[0]); //input
         Path p2 = new Path(args[1]); //output
       
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ? 0:1);
  }
}

-----------------

submit:

[training@localhost ~]$ hadoop jar  Desktop/myapp.jar  mr.analytics.Driver7  mrlab/emp  mrlab/r7

[training@localhost ~]$ hadoop fs -cat mrlab/r7/part-r-00000
f       8000
m       6000

---------------------------------------






















Spark Lab2

scala> val emp = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/spark/emp")

scala> val earray = emp.map(x=> x.split(","))
earray: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[2] at map at <console>:14

scala> earray.collect

Array[Array[String]] = Array(Array(101, aa, 20000, m, 11), Array(102, bb, 30000, f, 12), Array(103, cc, 40000, m, 11), Array(104, ddd, 50000, f, 12), Array(105, ee, 60000, m, 12), Array(106, dd, 90000, f, 11))

scala> val epair = earray.map( x =>
     |   (x(4), x(2).toInt))

scala> val ressum = epair.reduceByKey(_+_)

scala> val resmax = epair.reduceByKey(Math.max(_,_))

scala> val resmin = epair.reduceByKey(Math.min(_,_))

scala> ressum.collect.foreach(println)
(12,140000)
(11,150000)

scala> val grpByDno =
      epair.groupByKey()

scala> grpByDno.collect
 Array[(String, Iterable[Int])] = Array((12,CompactBuffer(30000, 50000, 60000)), (11,CompactBuffer(20000, 40000, 90000)))

scala> val resall = grpByDno.map(x => 
          x._1+"\t"+
          x._2.sum+"\t"+
         x._2.size+"\t"+
         x._2.sum/x._2.size+"\t"+
         x._2.max+"\t"+
         x._2.min  )

12 140000 3 46666 60000 30000
11 150000 3 50000 90000 20000

[cloudera@quickstart ~]$ hadoop fs -cat spark/today1/part-00000
12 140000 3 46666 60000 30000
11 150000 3 50000 90000 20000
[cloudera@quickstart ~]$ 

____________________________________


aggregations by multiple grouping.

ex: equivalant sql/hql query.

select dno, sex , sum(sal) from emp
  group by dno, sex;
---
scala> val DnoSexSalPair = earray.map(
     |   x => ((x(4),x(3)),x(2).toInt) )
scala> DnoSexSalPair.collect.foreach(println)

((11,m),20000)
((12,f),30000)
((11,m),40000)
((12,f),50000)
((12,m),60000)
((11,f),90000)

scala> val rsum = DnoSexSalPair.reduceByKey(_+_)

scala> rsum.collect.foreach(println)

((11,f),90000)
((12,f),80000)
((12,m),60000)
((11,m),60000)

scala> val rs = rsum.map( x =>
     x._1._1+"\t"+x._1._2+"\t"+
                x._2 )
  
scala> rs.collect.foreach(println)

11 f 90000
12 f 80000
12 m 60000
11 m 60000


_______________________________________

grouping by multiple columns, and multiple aggregations.

Assignment:

 select dno, sex, sum(sal), max(sal) ,
  min(sal), count(*), avg(sal)
   from emp group by dno, sex;

val grpDnoSex = 
    DnoSexSalPair.groupByKey();

val r = grpDnoSex.map( x => 
     x._1._1+"\t"+
           x._1._2+"\t"+
    x._2.sum+"\t"+
    x._2.max+"\t"+
x._2.min+"\t"+
    x._2.size+"\t"+
x._2.sum/x._2.size  )
r.collect.foreach(println)

11 f 90000 90000 90000 1 90000
12 f 80000 50000 30000 2 40000
12 m 60000 60000 60000 1 60000
11 m 60000 40000 20000 2 30000
______________________________________


    
    
    
    



___________________________________













MR Lab1 : WordCount



[training@localhost ~]$ cat > comment
i love hadoop
i love java
i hate coding
[training@localhost ~]$ hadoop fs -mkdir mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal comment mrlab
[training@localhost ~]$ hadoop fs -cat mrlab/comment
i love hadoop
i love java
i hate coding
[training@localhost ~]$

___________________________

Eclipse steps :

create java project:

file--> new --> java project
  ex: MyMr

configure jar files.

MyMr--> src --> Build Path --> Configure build Path
      ---> Libraries ---> add external jar files.

  select folloing jars.
   /usr/lib/hadoop-*.*/hadoop-core.jar
  /usr/lib/hadoop-*.*/lib/commons-cli-*.*.jar

create package :

MyMr --> src --> new --> package
 ex: mr.analytics

create java class :

MyMr --> src --> mr.analytics --> new --> class
  ex: WordMap

__________________________________

WordMap.java:
 is a mapper program, which writes word as key and 1 as value.
----------------------
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.util.StringTokenizer;
public class WordMap extends Mapper
 <LongWritable,Text,Text,IntWritable>
{
     // i love hadoop
    public void map(LongWritable  k,Text v,
            Context  con)
     throws IOException, InterruptedException
     {
        String line = v.toString();
    StringTokenizer t = new StringTokenizer(line);
     while(t.hasMoreTokens())
     {
         String word = t.nextToken();
         con.write(new Text(word),new IntWritable(1));
     }
  }
}
---------------------
RedForSum.java
  is Reducer program, which will give sum aggregation.

----------------------------
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RedForSum extends Reducer
<Text,IntWritable,Text,IntWritable>
{
    //   i   <1,1,1>   
   public void reduce(Text k,Iterable<IntWritable> vlist,
            Context con)
   throws IOException, InterruptedException
   {
       int tot=0;
       for(IntWritable v: vlist)
          tot+=v.get();
       con.write(k, new IntWritable(tot));
   }
}
---------------------------

Driver1.java

 is Driver program, to call mapper and reducer
-----------------
package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driver1
{
    public static void main(String[] args)
     throws Exception
     {
        Configuration c = new Configuration();
        Job j = new Job(c,"MyFirst");
        j.setJarByClass(Driver1.class);
        j.setMapperClass(WordMap.class);
        j.setReducerClass(RedForSum.class);
        j.setOutputKeyClass(Text.class);
        j.setOutputValueClass(IntWritable.class);
         Path p1 = new Path(args[0]); //input
         Path p2 = new Path(args[1]); //output
       
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ? 0:1);
  }
}
---------------------

Now export all these 3 classes into jar file.

MyMr --> export --> jar -->java jar
  ex: /home/training/Desktop/myapp.jar

-------------------------------

submitting mr job:

[training@localhost ~]$ hadoop jar Desktop/myapp.jar \
>  mr.analytics.Driver1 \
>  mrlab/comment \
>  mrlab/res1

[training@localhost ~]$ hadoop fs -ls mrlab/res1
Found 3 items
-rw-r--r--   1 training supergroup          0 2016-08-30 05:27

/user/training/mrlab/res1/_SUCCESS
drwxr-xr-x   - training supergroup          0 2016-08-30 05:26

/user/training/mrlab/res1/_logs
-rw-r--r--   1 training supergroup         58 2016-08-30 05:27

/user/training/mrlab/res1/part-r-00000

[training@localhost ~]$ hadoop fs -cat mrlab/res1/part-r-00000
coding  1
hadoop  1
hate    1
i       3
java    1
love    2















spark lab1 : Spark Aggregations : map, flatMap, sc.textFile(), reduceByKey(), groupByKey()

spark Lab1:
___________
[cloudera@quickstart ~]$ cat > comment
i love hadoop
i love spark
i love hadoop and spark
[cloudera@quickstart ~]$ hadoop fs -mkdir spark
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal comment spark

Word Count using spark:

scala> val r1 = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/spark/comment")

scala> r1.collect.foreach(println)

scala> val r2 = r1.map(x => x.split(" "))

scala> val r3 = r2.flatMap(x => x)

instead of writing r2 and r3.

scala> val words  = r1.flatMap(x =>
     |    x.split(" ") )

scala> val wpair = words.map( x =>
     |    (x,1) )

scala> val wc = wpair.reduceByKey((x,y) => x+y)

scala> wc.collect

scala> val wcres = wc.map( x =>
     |     x._1+","+x._2 )

scala> wcres.saveAsTextFile("hdfs://quickstart.cloudera/user/cloudera/spark/results2")


[cloudera@quickstart ~]$ cat emp
101,aa,20000,m,11
102,bb,30000,f,12
103,cc,40000,m,11
104,ddd,50000,f,12
105,ee,60000,m,12
106,dd,90000,f,11
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal emp spark
[cloudera@quickstart ~]$

scala> val e1 = sc.textFile("/user/cloudera/spark/emp")

scala> val e2 = e1.map(_.split(","))

scala> val epair = e2.map( x=>
     |   (x(3), x(2).toInt ) )

scala> val res = epair.reduceByKey(_+_)
res: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:24

scala> res.collect.foreach(println)
(f,170000)
(m,120000)

scala> val resmax = epair.reduceByKey(
     |    (x,y) => Math.max(x,y))


scala> val resmin = epair.reduceByKey(Math.min(_,_))

scala> resmax.collect.foreach(println)
(f,90000)
(m,60000)

scala> resmin.collect.foreach(println)
(f,30000)
(m,20000)

scala> val grpd = epair.groupByKey()



scala> val resall = grpd.map(x =>
     |  (x._1, x._2.sum,x._2.size,x._2.max,x._2.min,x._2.sum/x._2.size) )
scala> resall.collect.foreach(println)










Friday 19 August 2016

Spark Sql and Hql

[cloudera@quickstart ~]$ sudo find / -name 

'hive-site.xml'

[cloudera@quickstart ~]$ sudo chmod -R 777 

/usr/lib/spark/conf
[cloudera@quickstart ~]$ cp 

/etc/hive/conf.dist/hive-site.xml 

/usr/lib/spark/conf
_____________________________________
from hive-site.xml -->  

hive.metastore.warehouse.dir

from spark 2.0.0  onwards above opt is 

depricated
    use following option..

------>   spark.sql.warehouse.dir
_____________________________________________

____  [ tested in cloudera 5.8 
          spark version 1.6.0 ]

[cloudera@quickstart ~]$ls 

/usr/lib/hue/apps/beeswax/data/sample_07.csv

[cloudera@quickstart ~]$ head -n 2 

/usr/lib/hue/apps/beeswax/data/sample_07.csv
_____________________
val hq  = new  

org.apache.spark.sql.hive.HiveContext(sc)

hq.sql("create database sparkdb")

hq.sql("CREATE TABLE sample_07 (code  

 string,description string,total_emp    

int,salary int) ROW FORMAT DELIMITED FIELDS  

TERMINATED BY '\t' STORED AS TextFile")

 [cloudera@quickstart ~]$ hadoop fs -mkdir    

sparks

 [cloudera@quickstart ~]$ hadoop fs -  

copyFromLocal  

/usr/lib/hue/apps/beeswax/data/sample_07.csv  

 sparks
 [cloudera@quickstart ~]$ hadoop fs -ls    

sparks

 hq.sql("LOAD DATA INPATH  

'/user/cloudera/sparks/sample_07.csv'  

OVERWRITE INTO TABLE sample_07")

 val df = hq.sql("SELECT * from sample_07")

__________________________________________
scala> df.filter(df("salary") > 150000).show

()
+-------+--------------------+---------

+------+
|   code|         description|total_emp|

salary|
+-------+--------------------+---------

+------+
|11-1011|    Chief executives|   299160|

151370|
|29-1022|Oral and maxillof...|     5040|

178440|
|29-1023|       Orthodontists|     5350|

185340|
|29-1024|     Prosthodontists|      380|

169360|
|29-1061|   Anesthesiologists|    31030|

192780|
|29-1062|Family and genera...|   113250|

153640|
|29-1063| Internists, general|    46260|

167270|
|29-1064|Obstetricians and...|    21340|

183600|
|29-1067|            Surgeons|    50260|

191410|
|29-1069|Physicians and su...|   237400|

155150|
+-------+--------------------+---------

+------+
____________________________________________


val sqlContext = new 

org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._


[cloudera@quickstart ~]$ gedit json1
[cloudera@quickstart ~]$ hadoop fs -

copyFromLocal json1 sparks
[cloudera@quickstart ~]$ hadoop fs -cat 

sparks/json1
{"name":"Ravi","age":23,"sex":"M"}
{"name":"Rani","age":24,"sex":"F"}
{"name":"Mani","sex":"M"}
{"name":"Vani","age":34}
{"name":"Veni","age":29,"sex":"F"}
[cloudera@quickstart ~]$ 

scala> val df = sqlContext.read.json

("/user/cloudera/sparks/json1")

scala> df.show()
+----+----+----+
| age|name| sex|
+----+----+----+
|  23|Ravi|   M|
|  24|Rani|   F|
|null|Mani|   M|
|  34|Vani|null|
|  29|Veni|   F|
+----+----+----+

scala> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)

scala> df.select("name").show()
+----+
|name|
+----+
|Ravi|
|Rani|
|Mani|
|Vani|
|Veni|
+----+


scala> df.select("age").show()
+----+
| age|
+----+
|  23|
|  24|
|null|
|  34|
|  29|
+----+


scala> 



scala> df.select("name","age").show()
+----+----+
|name| age|
+----+----+
|Ravi|  23|
|Rani|  24|
|Mani|null|
|Vani|  34|
|Veni|  29|
+----+----+


scala> df.select("name","sex").show()
+----+----+
|name| sex|
+----+----+
|Ravi|   M|
|Rani|   F|
|Mani|   M|
|Vani|null|
|Veni|   F|
+----+----+


scala> 


scala> df.select(df("name"),df

("age")+10).show()
+----+----------+
|name|(age + 10)|
+----+----------+
|Ravi|        33|
|Rani|        34|
|Mani|      null|
|Vani|        44|
|Veni|        39|
+----+----------+

scala> df.filter(df("age")<34).show()
+---+----+---+
|age|name|sex|
+---+----+---+
| 23|Ravi|  M|
| 24|Rani|  F|
| 29|Veni|  F|
+---+----+---+

scala> df.filter(df("age")>=5 && df("age")

<30).show()
+---+----+---+
|age|name|sex|
+---+----+---+
| 23|Ravi|  M|
| 24|Rani|  F|
| 29|Veni|  F|
+---+----+---+

scala> df.groupBy("age").count().show()
+----+-----+                                  

                                  
| age|count|
+----+-----+
|  34|    1|
|null|    1|
|  23|    1|
|  24|    1|
|  29|    1|
+----+-----+


scala> df.groupBy("sex").count().show()
+----+-----+                                  

                                  
| sex|count|
+----+-----+
|   F|    2|
|   M|    2|
|null|    1|
+----+-----+


scala> 
scala> df.registerTempTable("df")

scala> sqlContext.sql("select * from 

df").collect.foreach(println)

[23,Ravi,M]
[24,Rani,F]
[null,Mani,M]
[34,Vani,null]
[29,Veni,F]

scala> val mm = sqlContext.sql("select * from 

df")
mm: org.apache.spark.sql.DataFrame = [age: 

bigint, name: string, sex: string]

scala> mm.registerTempTable("mm")

scala> sqlContext.sql("select * from 

mm").collect.foreach(println)
[23,Ravi,M]
[24,Rani,F]
[null,Mani,M]
[34,Vani,null]
[29,Veni,F]

scala> mm.show()
+----+----+----+
| age|name| sex|
+----+----+----+
|  23|Ravi|   M|
|  24|Rani|   F|
|null|Mani|   M|
|  34|Vani|null|
|  29|Veni|   F|
+----+----+----+


scala> val x = mm
x: org.apache.spark.sql.DataFrame = [age: 

bigint, name: string, sex: string]

scala> 

cala> val aggr1 = df.groupBy("sex").agg( max

("age"), min("age"))
aggr1: org.apache.spark.sql.DataFrame = [sex: 

string, max(age): bigint, min(age): bigint]

scala> aggr1.collect.foreach(println)
[F,29,24]                                     

                                  
[M,23,23]
[null,34,34]

scala> aggr1.show()
+----+--------+--------+                      

                                  
| sex|max(age)|min(age)|
+----+--------+--------+
|   F|      29|      24|
|   M|      23|      23|
|null|      34|      34|
+----+--------+--------+


scala> 

____________________

  ex:
[cloudera@quickstart ~]$ cat > emp1
101,aaa,30000,m,11
102,bbbb,40000,f,12
103,cc,60000,m,12
104,dd,80000,f,11
105,cccc,90000,m,12        
[cloudera@quickstart ~]$ cat > emp2
201,dk,90000,m,11
202,mm,100000,f,12
203,mmmx,80000,m,12
204,vbvb,70000,f,11
[cloudera@quickstart ~]$ hadoop fs -

copyFromLocal emp1 sparklab
[cloudera@quickstart ~]$ hadoop fs -

copyFromLocal emp2 sparklab
[cloudera@quickstart ~]$ 

scala> val emp1 = sc.textFile

("/user/cloudera/sparklab/emp1")


scala> val emp2 = sc.textFile

("/user/cloudera/sparklab/emp2")


scala> case class Emp(id:Int, name:String, 
     |    sal:Int, sex:String, dno:Int)


scala> def toEmp(x:String) = {
     |   val  w = x.split(",")
     |   Emp(w(0).toInt, 
     |        w(1), w(2).toInt, 
     |       w(3), w(4).toInt)
     | }
toEmp: (x: String)Emp

scala> val e1 = emp1.map(x => toEmp(x))
e1: org.apache.spark.rdd.RDD[Emp] = 

MapPartitionsRDD[43] at map at <console>:37

scala> val e2 = emp2.map(x => toEmp(x))
e2: org.apache.spark.rdd.RDD[Emp] = 

MapPartitionsRDD[44] at map at <console>:37

scala> 

scala> val df1 = e1.toDF
df1: org.apache.spark.sql.DataFrame = [id: 

int, name: string, sal: int, sex: string, 

dno: int]

scala> val df2 = e2.toDF
df2: org.apache.spark.sql.DataFrame = [id: 

int, name: string, sal: int, sex: string, 

dno: int]

scala> 

scala> val df = sqlContext.sql("select * from 

df1 union all  select * from df2")


scala> val res = sqlContext.sql("select sex,  

   sum(sal) as tot, count(*) as cnt 
   from df group by sex")

scala> 
scala> val wrres = res.map(x => x(0)+","+x

(1)+","+x(2))

scala> wrres.saveAsTextFile

("/user/cloudera/mytemp")

scala> hq.sql("create database park")
scala> hq.sql("use park")
scala> hq.sql("create table urres(sex string, 
   tot int, cnt int) 
   row format delimited 
    fields terminated by ',' ")

scala> hq.sql("load data inpath 

'/user/cloudera/mytemp/part-00000' into table 

 urres ")

scala> val hiveres = hq.sql("select * from 

urres")

scala> hiveres.show()
_____________________________________



































Tuesday 16 August 2016

Flume Lab

***************************om aim hreem sreem **************************
sreeram hadoop notes
sreeram flume notes for practice



conf/agent1.conf
_______________________

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1

# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1


# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/myFlume

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1

############################################
submit above flow using following command

[cloudera@quickstart ~]$ hadoop fs -mkdir myFlume

[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file conf/agent1.conf --name a1 -Dflume.root.logger=INFO,console

[cloudera@quickstart ~]$ hadoop fs -ls myFlume

note: By default sink will write in  sequence format

The Risk in above flow is, if channel failed or channel system down, 
 data will be missed.  to provide fault tolerence for channel use following flow.

***********************om aim hreem sreem***********************************

sreeram hadoop notes:
sreeram flume notes for practice:

conf/agent2.conf
___________________

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = src1
a1.sinks = s1 s2 
a1.channels = c1 c2

# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1


# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/urFlume

a1.sinks.s2.type = hdfs
a1.sinks.s2.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/urFlume


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1200
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.src1.channels = c1 c2
a1.sinks.s1.channel = c1
a1.sinks.s2.channel = c2

##################################

[cloudera@quickstart ~]$ hadoop fs -mkdir urFlume
[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file conf/agent2.conf --name a1 -Dflume.root.logger=INFO,console

[cloudera@quickstart ~]$ hadoop fs -ls urFlume

from above flow, src1 is writing into c1 and c2 channels,
  if one channel fails, still data available in another.
but if no failure happend data will be duplicated in target hadoop directory.
so before processing data, we need eliminated duplicate records.

******************om aim hreem sreem**********************************

sreeram hadoop notes:
sreeram flume notes for practice:
Task --> importing from Hive Table

conf/agent3.conf
______________________________
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1

# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=hive -e 'select * from mraw'


# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/ourFlume

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1

#############################################

[cloudera@quickstart ~]$ hive
hive> create table mraw(line string);
OK
Time taken: 2.218 seconds
hive> load data local inpath 'f1' into table mraw;
hive> select * from mraw limit 5;
OK
aaaaaaaaaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
cccccccccccccccccccccccccccccc
ddddddddddddddddddddddddddd
Time taken: 0.341 seconds, Fetched: 5 row(s)
hive> 

[cloudera@quickstart ~]$ hadoop fs -mkdir ourFlume

[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file conf/agent3.conf --name a1 -Dflume.root.logger=INFO,console

[cloudera@quickstart ~]$ hadoop fs -ls ourFlume

in all above cases , output will be in sequence file format.

*****************************om aim hreem sreem ***************************

sreeram hadoop notes:
sreeram flume notes for practice:

conf/agent4.conf
________________________________




# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1

# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1


# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/naFlume


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1

######################################

 above flow will write output in Text Format.

[cloudera@quickstart ~]$ hadoop fs -mkdir naFlume

[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file conf/agent4.conf --name a1 -Dflume.root.logger=INFO,console

[cloudera@quickstart ~]$ hadoop fs -ls naFlume

[cloudera@quickstart ~]$ hadoop fs -cat naFlume/FlumeData.1471351131067
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
cccccccccccccccccccccccccccccc
ddddddddddddddddddddddddddd
eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee
ddddddddddddddddddddddddddddd
ccccccccccccccccccccccccccccccc
cccccccccccccccccccccccccccccc
ccccccccccccccccccccccccccccccc

above output is in Text Format.

*******************om aim hreem  sreem*****************************