Saturday, 8 April 2017

spark sql with json and xml processing

-----------


   Spark Sql
---------------

  [ ]
     Spark sql is a library,
   to process spark data objects,
   using sql select statements.

 Spark sql follows mysql based sql syntaxes.
==============================================
 Spark sql provides,
   two types of contexts.
  i) sqlContext
 ii) HiveContext.

import org.apache.spark.sql.SqlContext

    val sqlCon = new SqlContext(sc)

using sqlContext ,
   we can process spark objects using select statements.

 Using HiveContext,
   we can integrate , Hive with Spark.
 Hive, is data warehouse environment in hadoop framework,
   So total is stored and managed at Hive tables.
  using HiveContext we can access entire hive enviroment (hive tables) from Spark.

  difference between, hql statement from Hive,
  and hql statement from Spark.
--> if hql is executed from Hive Environment,
  the statement to process, will be converted
 as mAPREDUCE job.
 ---> if same hive is integrated with spark,
  and hql is submitted from spark,
    it uses, DAG and Inmemory computing models.
  which is more faster than MapReduce.

 import org.apache.spark.sql.hive.HiveContext

 val hc = new HiveContext(sc)
-----------------------------
 Example of sqlContext.

 val sqc = new SqlContext(sc)

  file name --> file1
  sample --->  100,200,300
               300,400,400
                :
                :
 step1)
  create case class for the data.
 
   case class  Rec(a:Int, b:Int, c:Int)

 step2) create a function ,
   to convert raw line into case object.
  [function to provide schema ]

 def makeRec(line:String)={
        val w = line.split(",")
        val a = w(0).toInt
        val b = w(1).toInt
        val c = w(2).toInt
        val r = Rec(a, b,c)
         r
    }
--------
step3) load data.
   val data = sc.textFile("/user/cloudera/sparklab/file1")

   100,200,300
   2000,340,456
    :
    :


step4) transform each record into case Object

  val recs = data.map(x => makeRec(x))

step5) convert rdd into data frme.
 
  val df = recs.toDF

step6)  create table instance for the dataframe.

   df.registerTempTable("samp")

step7) apply select statement of sql on temp table.

val  r1 = sqc.sql("select a+b+c as tot from samp")

    r1
  ------
   tot
   ----
   600
   900

  r1.registerTempTable(samp1)

val r2 =    sqc.sql("select sum(tot) as gtot from samp1")


   once "select" statement is applied on
 temp table, returned object will be dataframe.


 to apply sql on processed results,
    again we need to register the dataframe
  as temp table.

 r1.registerAsTempTable("Samp2")
 
 val r2 = sqc.sql("select * from samp2
                      where tot>=200")

-----------------------------------

         sales
   --------------------
     :
   12/27/2016,10000,3,10
     :
     :

  -------------------------
 Steps involing in Spark Sql.[sqlContext]
----------------------------
   monthly sales report...
schema ---> date, price, qnt, discount

  step1)
    case class Sales(mon : Int, price:Int,
             qnt :Int, disc: Int)

  step2)
    def toSales(line: String) = {
       val w = line.split(",")
       val mon = w(0).split("/")(0)
       val p = w(1).toInt
       val q = w(2).toInt
       val d = w(3).toInt
       val srec = Sales(mon,p,q,d)
       srec
   }
step3)
 val data = sc.textFile("/user/cloudera/mydata/sales.txt")
step4)
  val strans = data.map(x => toSales(x))
step5)

  val sdf = strans.toDF
  sdf.show

step6)

  sdf.registerTempTable("SalesTrans")

step7) // play with select
  ---> mon, price, qnt, disc

 val res1 = sqlContext.sql("select mon ,
             sum(
             (price -  price*disc/100)*qnt
                ) as tsales from SalesTrans
              group by mon")

 res1.show
 res1.printSchema

-----------------------------------------

 val res2 = res1

 res1.registerTempTable("tab1")
 res2.registerTempTable("tab2")

 val res3 = sqlContext.sql("select l.mon as m1,
     r.mon as m2, l.tsales as t1,
         r.tsales as t2
    from tab1 l join tab2 r
          where (l.mon-r.mon)==1")
//  11 rows.
 res3.registerTempTable("tab3")

------------------------------
 val res4 = sqlContext.sql("select
    m1, m2, t1, t2, ((t2-t1)*100)/t1 as sgrowth
    from tab3")

  res4.show()
------------------------------------------
  json1.json
--------------------------
 {"name":"Ravi","age":25,"city":"Hyd"}
 {"name":"Rani","sex":"F","city":"Del"}
   :
   :
---------------------------------------

    val df = sqlContext.read.json("/user/cloudera/mydata/json1.json")

   df.show
 ------------------------
  name    age    sex    city
 ----------------------------------------
 ravi     25     null hyd
 rani  null   F del
    :
    :
--------------------------------------

 json2.json
 ------------------
 {"name":"Ravi","age":25,
    "wife":{"name":"Rani","age":23},"city":"Hyd"}}
   :
   :

 val df2 = sqlContext.read.json("/../json2.json")

  df2
-----------------------
name     age      wife                  city
Ravi 25    {"name":"rani","age":23}  HYd
    :
---------------------

 df2.registerTempTable("Info")

val df3 = sqlContext.sql("select name,
           wife.name as wname,
           age, wife.age as wage,
          abs(age-wife.age) as diff,
          city from Info")
----------------------------------------
 xml data processing with spark sql.

---spark sql does not have, direct libraries
  for xml processing.
   two ways.
     i) 3 rd party api [ ex: databricks]
    ii) using Hive Integreation.

 2nd is best.

How to integrate Hive with spark .
 ---Using HiveContext.


step1)
   copy hive-site.xml file into,
     /usr/lib/spark/conf directory.

what if , hive-site.xml is not copied into
    conf directory of spark?
--- spark can not understand,
  hive's metastore location [derby/mysql/oracle ....]
    this info is available with hive-site.xml .


 step2)

  create hive Context object

import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)

  step3)  access Hive Environment from spark

 hc.sql("create database mydb")
 hc.sql("use mydb")
 hc.sql("create table samp(line string)")
 hc.sql("load data local inpath 'file1'
         into table samp")
 val df = hc.sql("select * from samp")
-------------------------------------
   xml1.xml
----------------------------------
<rec><name>Ravi</name><age>25</age></rec>
<rec><name>Rani</name><sex>F</sex></rec>
 :
 :
------------------------------------------

 hc.sql("create table raw(line string)")
 hc.sql("load data local inpath 'xml1.xml'
   into table raw")
 hc.sql("create table info(name string,
          age int, sex string)")
 hc.sql("insert overwrite table info
    select xpath_string(line,'rec/name'),
       xpath_int(line, 'rec/age'),
      xpath_string(line, 'rec/sex')
      from raw")
----------------------------------------
 xml2.xml
------------
<rec><name><fname>Ravi</fname><lname>kumar</lname><age>24</age><contact><email><personal>ravi@gmail.com</personal><official>ravi@ventech.com</official></email><phone><mobile>12345</mobile><office>123900</office><residence>127845</residence></phone></contact><city>Hyd</city></rec>

hc.sql("create table xraw(line string)")
hc.sql("load data local inpath 'xml2.xml'
   into table xraw")
hc.sql("create table xinfo(fname string ,
   lname string, age int,
    personal_email string,
    official_email string,
    mobile String,
    office_phone string ,
    residence_phone string,
    city string)")

hc.sql("insert overwrite table xinfo
  select
     xpath_string(line,'rec/name/fname'),
     xpath_string(line,'rec/name/lname'),
     xpath_int(line,'rec/age'),
     xpath_string(line,'rec/contact/email/personal'),
  xpath_string(line,'rec/contact/email/official'),
xpath_string(line,'rec/contact/phone/mobile'),
xpath_string(line,'rec/contact/phone/office'),
xpath_string(line,'rec/contact/phone/residence'),
xpath_string(line,'rec/city')
  from xraw")
-------------------------
 xml3.xml
----------------
<tr><cid>101</cid><pr>200</pr><pr>300</pr><pr>300</pr></tr>
<tr><cid>102</cid><pr>400</pr><pr>800</pr></tr>
<tr><cid>101</cid><pr>1000</pr></tr>
--------------------------------

hc.sql("create table sraw")
hc.sql("load data local inpath 'xml3.xml'
    into table sraw")
hc.sql("create table raw2(cid int, pr array<String>)")

hc.sql("insert overwrite table raw2
   select xpath_int(line, 'tr/cid'),
      xpath(line,'tr/pr/text()')
     from sraw")
hc.sql("select * from raw2").show
-------------------------------
cid           pr
101       [100,300,300]
102       [400,800]
101       [1000]

hc.sql("select explode(pr) as price from  raw2").show

    100
    300
    300
    400
    800
   1000

hc.sql("select cid, explode(pr) as price from  raw2").show

----> above is invalid.

hc.sql("create table raw3(cid int, pr int)")
hc.sql("Insert overwrite table raw3
    select name, mypr from raw2
      lateral view explode(pr) p as mypr")

hc.sql("select * from raw3").show

cid     pr
101 200
101 300
101     300
102     400
102     800
101    1000

hc.sql("create table summary(cid int, totbill long)")

hc.sql("insert overwrite table summary
   select cid , sum(pr) from raw3
   group by cid")

--------------------


























 

































































       









 

































 



























             



















































 







































 





 











































   




















































   








Friday, 31 March 2017

Linear Regression with SGD

[cloudera@quickstart ~]$ gedit prof
[cloudera@quickstart ~]$ hadoop fs -mkdir mlib
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal prof mlib
[cloudera@quickstart ~]$

scala> val data = sc.textFile

("/user/cloudera/mlib/prof")
data: org.apache.spark.rdd.RDD[String] =

/user/cloudera/mlib/prof MapPartitionsRDD[1]

at textFile at <console>:27

scala> data.collect.take(3).foreach(println)
"a","w","h","c"
25,80,5.9,120
23,55,5.7,90


scala> val ndata = data.filter{x =>
     |     !(x.split(",")(0).contains("a"))
     | }


scala> ndata.collect.foreach(println)
25,80,5.9,120
23,55,5.7,90
23,89,6.0,130
26,80,5.9,120
23,55,5.7,90
23,69,6.0,130
28,81,5.9,120
23,55,5.9,190
23,81,6.0,130
29,87,5.9,120
23,55,5.7,190
23,89,5.0,130

scala>

scala>
     | import

org.apache.spark.mllib.regression.LabeledPoint
import

org.apache.spark.mllib.regression.LabeledPoint

scala> import

org.apache.spark.mllib.regression.LinearRegres

sionModel
import

org.apache.spark.mllib.regression.LinearRegres

sionModel

scala> import

org.apache.spark.mllib.regression.LinearRegres

sionWithSGD
import

org.apache.spark.mllib.regression.LinearRegres

sionWithSGD

scala> import

org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors

-------------
scala>  def toLabel(line:String) = {
     |              
     |           val w = line.split(",")
     |           val lbl = w(3).toDouble
     |           val f = w.take(3).map(x =>

x.toDouble)
     |           LabeledPoint(lbl,

Vectors.dense(f))
     |        }
toLabel: (line: String)

org.apache.spark.mllib.regression.LabeledPoint

scala>

scala> toLabel("23,78,5.9,120")
res8:

org.apache.spark.mllib.regression.LabeledPoint

= (120.0,[23.0,78.0,5.9])

scala>

val trainset = ndata.map(x => toLabel(x))

scala> val trainset = ndata.map(x => toLabel

(x))
trainset: org.apache.spark.rdd.RDD

[org.apache.spark.mllib.regression.LabeledPoin

t] = MapPartitionsRDD[3] at map at

<console>:37

scala>

scala> trainset.collect.foreach(println)
(120.0,[25.0,80.0,5.9])
(90.0,[23.0,55.0,5.7])
(130.0,[23.0,89.0,6.0])
(120.0,[26.0,80.0,5.9])
(90.0,[23.0,55.0,5.7])
(130.0,[23.0,69.0,6.0])
(120.0,[28.0,81.0,5.9])
(190.0,[23.0,55.0,5.9])
(130.0,[23.0,81.0,6.0])
(120.0,[29.0,87.0,5.9])
(190.0,[23.0,55.0,5.7])
(130.0,[23.0,89.0,5.0])

scala>


val numIterations = 100
val model = LinearRegressionWithSGD.train

(trainset, numIterations)


val valuesAndPreds = trainset.map {
   x =>
  val prediction =       model.predict

(x.features)

  (x.label, prediction)
}

// above contains , y and ycap,
 y is actual label  and ycap is predicted

label.

  [ label means response variable ].


scala> valuesAndPreds.collect.foreach(println)
(120.0,-7.150280334821135E301)
(90.0,-5.078652953403039E301)
(130.0,-7.824818198048878E301)
(120.0,-7.176538392878548E301)
(90.0,-5.078652953403039E301)
(130.0,-6.210523036282773E301)
(120.0,-7.309769267081678E301)
(190.0,-5.07989526649868E301)
(130.0,-7.179100133342436E301)
(120.0,-7.820315873668922E301)
(190.0,-5.078652953403039E301)
(130.0,-7.818606632570674E301)


val mse = valuesAndPreds.map{ x =>
       val y = x._1.toInt
       val ycap = x._2.toInt
       val e = y - ycap
       e*e
     }.mean

continue the trails by improving
  number of iterations. till you meet

convergence.  [[ mse wont be changed. ]]




val  acc = valuesAndPreds.map{ x =>
      val y = x._1.toInt
      val ycap = x._2.toInt
   
      val dist = ((y-ycap)*100)/y

   val stat=if (dist>= -20 & dist<= 20) "Pass"

else "Fail"
      (stat,1)
     }

val accres = acc.reduceByKey(_+_)

---------------------------------

if accuracy satisfied,
   apply the predictions on predictables(live

data )


   model.predict(<dense vector>)
    dense vector should contain
       only features.


-----------------------------------





 
     







Thursday, 30 March 2017

R and Analytics Basics3

sankara.deva2016@gmail.com

bharat sreeram, Ventech It Solutions.
-----------------------------------------------------------

# Linear REgression Implementation in R
---------------

df = 
read.csv("C:/Users/Hadoop/Desktop/prof.txt")

> df

    a  w   h   c

1  25 80 5.9 120
2  23 55 5.7  90
3  23 89 6.0 130
4  26 80 5.9 120
5  23 55 5.7  90
6  23 69 6.0 130
7  28 81 5.9 120
8  23 55 5.9 190
9  23 81 6.0 130
10 29 87 5.9 120
11 23 55 5.7 190
12 23 89 5.0 130





Y = matrix(df$c, ncol=1)

Xdf = data.frame(beta = 1, a=df$a, w=df$w,
     h=df$h)
X = data.matrix(Xdf)

Xt = t(X)
XtX = Xt %*% X
inv = solve(XtX)
XtY = Xt %*% Y
beta = inv %*% XtY

-----------------------

model = lm(c ~ a + w + h , data=df)
-----------------------

Model testing: [accuracy testing ]

> x
 [1]  10  20  30  40  23  45  67  90 100 150
> y
 [1]  23  45  67  90  50 100 130 190 210 270
> cor(x,y)
[1] 0.9929889
> df = data.frame(x, y)
> lmfit = lm(y ~ x, data=df)

> params = coefficients(lmfit)
> a = params[1]
> b = params[2]

> test = data.frame(y=df$y)
> test$ycap = a + (b*df$x)
> test$dist = ((test$ycap-test$y)*100)/test$y

> test$res = "Fail"
> test$res[test$dist>=-20 & test$dist<=20] = "Pass"
> prop.table(table(test$res))


--------------------------------

non-linear regression implementation:

> dframe = data.frame(x = df$x,
+                    xsq = df$x^2,
+                    xcube = df$x^3,                             y = df$y)
> nlmfit = lm(y ~ x + xsq + xcube, data=dframe)
> p = coefficients(nlmfit)
> b0 = p[1]
> b1 = p[2]
> b2 = p[3]
> b3 = p[4]

> newtest = data.frame(y = dframe$y)
> newtest$ycap = b0 + (b1*dframe$x) +
+                      (b2 * dframe$xsq) +
+                      (b3 * dframe$xcube)

> newtest$dist = ((newtest$ycap-newtest$y)*100)/newtest$y

> newtest$res = "Fail"
> newtest$res[newtest$dist>=-20 & 
          newtest$dist<=20] = "Pass"
> prop.table(table(newtest$res))














R and Analytics Basics 2

sankara.deva2016@gmail.com

bharat sreeram , Ventech It Solutions
-----------------------------------------------------------------------------------------------

> info =

  read.csv("C:/Users/Hadoop/Desktop/info.txt")
> info
   id    name   sal sex city
1 101    Amar 40000   m  hyd
2 102   Amala 50000   f  del
3 103   sunil 70000   m  hyd
4 104 sunitha 80000   f  hyd
5 105  ankith 90000   m  del
6 106 ankitha 60000   f  hyd
>
> class(info)
[1] "data.frame"
> str(info)
 -- structure of info.
'data.frame':   6 obs. of  5 variables:
 $ id  : int  101 102 103 104 105 106
 $ name: Factor w/ 6 levels "Amala","Amar",..: 2 1 5 6 3 4
 $ sal : int  40000 50000 70000 80000 90000 60000
 $ sex : Factor w/ 2 levels "f","m": 2 1 2 1 2 1
 $ city: Factor w/ 2 levels "del","hyd": 2 1 2 2 1 2
>
> info$name
[1] Amar    Amala   sunil   sunitha ankith  ankitha
Levels: Amala Amar ankith ankitha sunil sunitha
> info$sal
[1] 40000 50000 70000 80000 90000 60000
>
-------------------
Updating Column:
 info$sal = info$sal+1000
Generating new Field for Data frame.
 info$tax = info$sal*10/100
  # conditional transformation,.
 info$grade = "C"
 info$grade[info$sal>=50000 & info$sal<=80000]    = "B"
 info$grade[info$sal>80000]="A"
> info
   id    name   sal sex city  tax grade
1 101    Amar 41000   m  hyd 4100     C
2 102   Amala 51000   f  del 5100     B
3 103   sunil 71000   m  hyd 7100     B
4 104 sunitha 81000   f  hyd 8100     A
5 105  ankith 91000   m  del 9100     A
6 106 ankitha 61000   f  hyd 6100     B
>
Grouping Aggregations:
--------------------------------
 res1 =
   aggregate( sal ~ sex, data=info, FUN=sum)
> res1
  sex    sal
1   f 193000
2   m 203000
>

 res2 =
   aggregate( sal ~ sex, data=info, FUN=mean)
> res2
  sex      sal
1   f 64333.33
2   m 67666.67

  res3 =
   aggregate( sal ~ sex, data=info, FUN=max)
> res3
  sex   sal
1   f 81000
2   m 91000

 res4 =
   aggregate( sal ~ sex, data=info, FUN=min)
> res4
  sex   sal
1   f 51000
2   m 41000


 res5 =
   aggregate( sal ~ sex, data=info, FUN=length)
> res5
  sex sal
1   f   3
2   m   3

---------------------------
 res6 = aggregate(sal ~ grade, data=info,
   FUN = length)
> res6
  grade sal
1     A   2
2     B   3
3     C   1

-----------------------------------
  select sex, grade, sum(sal) from info
    group by sex, grade;
Grouping by multiple columns of data frame.
 res7 =
   aggregate( sal ~ sex + grade, data=info, FUN=sum)
> res7
  sex grade    sal
1   f     A  81000
2   m     A  91000
3   f     B 112000
4   m     B  71000
5   m     C  41000

-----------------------------------------

 select sex, sum(sal), max(sal),
      min(sal), avg(sal), count(*)
   from info group by sex;
# performing multiple aggregation,
   of each data group.

Performing Multiple Aggregations:
r1 = aggregate(sal ~ sex , data=info , FUN=sum)
r2 = aggregate(sal ~ sex , data=info , FUN=max)
r3 = aggregate(sal ~ sex , data=info , FUN=min)
r4 = aggregate(sal ~ sex , data=info , FUN=mean)
r5 = aggregate(sal ~ sex , data=info , FUN=length)
resall = data.frame(sex=r1$sex,
          tot=r1$sal, avg=r4$sal,
         max= r2$sal, min=r3$sal,
        cnt=r5$sal)
> resall
  sex    tot      avg   max   min cnt
1   f 193000 64333.33 81000 51000   3
2   m 203000 67666.67 91000 41000   3
>
--------------------------------------
Working With Matrices.
 v = 1:15
  m1 = matrix(v, nrow=3)
 m2 = matrix(v, nrow=3 ,byrow=T)
 m3 = matrix(v, ncol=5, byrow=T)
 m4 = matrix(1:12,4)
 Operations of matrices.
 m = matrix(c(10,20,30,40), nrow=2)
 mx = matrix(c(12,34,56,23), nrow=2)
 r1 = m + mx   # element to element sum
 r2 = m * mx   #   "        "       multiplication
 r3 = m %*% mx  # matrix multiplication
 r4 = solve(m) # inverse of matrix.
 r5 = t(m) # transpose of matrix.
-------------------------------------------




























----------------------




























































R and Analytics Basics 1

sankara.deva2016@gmail.com

Bharat sreeram, Ventech It Solutions
-----------------------------------------------------------


 ls()
--- list of of r objects.
rm("x")
-- removes object
-----------------------------------------
 a <- 100
 a = 100
 b = 200
 c = a + b
 d = a^3
 e = a**2
 name = "Giri"
  a%%2 ---> remainder.
----------------------------------
>  x = c (10,20,30,40,23,45,67,90)
> x[1] # first element
> length(x) # number of elements
> x[length(x)] # last element
> x[1:3] # first 3 elements
> x[4:length(x)] # 4th onwards
> x[9]=100 # adding 9 th element
> x[length(x)+1]=150 # adding element to last.
--------------------------------
> x
 [1]  10  20  30  40  23  45  67  90 100 150
> y = x + 10
> y
 [1]  20  30  40  50  33  55  77 100 110 160
>
 z = c(1,2,3,4,5)
 u = x + z
 # element to element arithmetic operation will happen.
  # if number of elements are different.
 iterations will happen till all elements of
 big vector is completed.
> sum(x)
[1] 575
> mean(x)
[1] 57.5
> length(x)
[1] 10
> max(x)
[1] 150
> min(x)
[1] 10
> var(x)
[1] 1953.389
> sd(x)
[1] 44.19716
>
-------------------------------------
gm = function(x){
  v = 1
  for (i in x){
     v = v * i
  }
  n = length(x)
  v^(1/n)
 }
# last expression will be returned.
------------------------------------
 hm = function(x) {
     xx = 1/x
     sxx = sum(xx)
     n = length(x)
     n/sxx
 }
--------------------------------
# function for variance of population
 vpop = function(x){
   xbar = mean(x)
   dev = x - xbar
   sumdsq = sum( dev^2)
    n = length(x)
    sumdsq/n
 }
 # function for standard deviation of Population.
 sdpop = function(x) {
    sqrt(vpop(x))
 }
--------------
 a = 1:10
 # generates auto sequence number,
   incremented by 1.
 b = 10:1
 c = seq(10,100,5)
 d = seq(100,10,-5)
-----------------------------------------

























----------------------------




.
   




Tuesday, 27 September 2016

MR Lab9 : Mapper Only Functionalities


Mapper Only functionality.
--------------------------

  row filter:

    ex: select * from emp where sex = 'm';

  for this reducer is not required.

     we need to suspend the reducer..
   ex:
        j.setNumReduceTasks(0);

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class RowFilterMap extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   { // select * from emp where sex ="m";
      String line = v.toString();
      String[] w = line.split(",");
      String sex = w[3];
      if(sex.matches("m"))
         con.write( v , NullWritable.get());
   }
}

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(RowFilterMap.class);
 // j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
 //j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}

[training@localhost ~]$ hadoop fs -cat mrlab/emp
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
101,janaki,10000,f,12
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp mrlab/males

[training@localhost ~]$ hadoop fs -ls mrlab/males
Found 3 items
-rw-r--r--   1 training supergroup          0 2016-09-27 06:57 /user/training/mrlab/males/_SUCCESS
drwxr-xr-x   - training supergroup          0 2016-09-27 06:57 /user/training/mrlab/males/_logs
-rw-r--r--   1 training supergroup         60 2016-09-27 06:57 /user/training/mrlab/males/part-m-00000
[training@localhost ~]$ hadoop fs -cat mrlab/males/part-m-00000
101,vino,26000,m,11
103,mohan,13000,m,13
105,naga,6000,m,13
[training@localhost ~]$

-------------------------------------------
 RowFilter 2:

   on unstructured Text:

[training@localhost ~]$ cat > news
Mr Modi implementing BigData for the Govt data process
Rahul learning BigData           
Pakistan Jurking India
BigData is a hipe or real
[training@localhost ~]$ hadoop fs -copyFromLocal news mrlab
[training@localhost ~]$

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class RowFilter2 extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   { // select * from News where
     //   contains(upcase(line),'BIGDATA');
     
      String line = v.toString().toUpperCase();
     
      if(line.contains("BIGDATA") ||
               line.contains("BIG DATA"))
         con.write( v , NullWritable.get());
   }
}

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(RowFilter2.class);
 // j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
 //j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}

[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/news  mrlab/bignews

[training@localhost ~]$ hadoop fs -cat mrlab/bignews/part-m-00000
Mr Modi implementing BigData for the Govt data process
Rahul learning BigData
BigData is a hipe or real
[training@localhost ~]$

--------------------------------

 Column Filter.

 ex:
     select name, sal, dno  from emp;


package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ColumnFilter extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   { // select name,sal,dno from emp;
      String line = v.toString();
      String[] w = line.split(",");
      String newLine = w[1]+","+
                       w[2]+","+w[4];
      con.write( new Text(newLine) , NullWritable.get());
   }
}

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(ColumnFilter.class);
 // j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
 //j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}

[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp  mrlab/cfilter

[training@localhost ~]$ hadoop fs -cat mrlab/cfilter/part-m-00000
vino,26000,11
Sri,25000,11
mohan,13000,13
lokitha,8000,12
naga,6000,13
janaki,10000,12
[training@localhost ~]$

-----------------------------------------

Generating new Fields

 hive>
   select id, name, sal, sal*0.1 as tax,
            sal*0.2 as hra,
          sal-(sal*0.1)+(sal*0.2) as net,
        sex, dno from emp;


package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class GenerateNewFields extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   {
      String line = v.toString();
      String[] w = line.split(",");
      int sal = Integer.parseInt(w[2]);
      int tax = sal*10/100;
      int hra = sal*20/100;
      int net = sal-tax+hra;
      String newLine =w[0]+","+w[1]+","+sal+","+
                     tax+","+hra+","+net+","+
                      w[3]+","+w[4];
     
     con.write( new Text(newLine) , NullWritable.get());
        
   }
}


package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(GenerateNewFields.class);
 // j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
 //j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}

[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp  mrlab/newFields

[training@localhost ~]$ hadoop fs -cat mrlab/newFields/part-m-00000
101,vino,26000,2600,5200,28600,m,11
102,Sri,25000,2500,5000,27500,f,11
103,mohan,13000,1300,2600,14300,m,13
104,lokitha,8000,800,1600,8800,f,12
105,naga,6000,600,1200,6600,m,13
101,janaki,10000,1000,2000,11000,f,12
[training@localhost ~]$
-----------------------------------
transformations

hive> select id, name, sal ,
    >  if(sal>=70000,'A',
    >     if(sal>=50000,'B',
    >      if(sal>=30000,'C','D'))) as grade,
    > if(sex='m','Male','Female') as sex,
    > if(dno=11,'Marketing',
    >  if(dno=12,'hr',
    >   if(dno=13,'Finance','Other'))) as dname
    > from emp;


package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Transform extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   {
      String line = v.toString();
      String[] w = line.split(",");
      int sal = Integer.parseInt(w[2]);
      String sex = w[3];
      int dno = Integer.parseInt(w[4]);
      String grade;
      if(sal>=70000)
           grade="A";
      else if(sal>=50000)
          grade="B";
      else if(sal>=30000)
          grade="C";
      else grade="D";
     
      if(sex.matches("m"))
          sex="Male";
      else sex="Female";
      String dname;
      switch(dno)
      {
      case 11:
           dname="Marketing";
           break;
      case 12:
           dname="Hr";
           break;
      case 13:
           dname="Finance";
           break;
      default:
           dname="Other";
      }

      String newLine = w[0]+","+
      w[1]+","+sal+","+grade+","+sex+","+dname;
      con.write(new Text(newLine), NullWritable.get());
      }
}


package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(Transform.class);
 // j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
 //j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}


[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp  mrlab/transform


[training@localhost ~]$ hadoop fs -cat mrlab/transform/part-m-00000
101,vino,26000,D,Male,Marketing
102,Sri,25000,D,Female,Marketing
103,mohan,13000,D,Male,Finance
104,lokitha,8000,D,Female,Hr
105,naga,6000,D,Male,Finance
101,janaki,10000,D,Female,Hr
[training@localhost ~]$












































































  


MR Lab 8 : Entire Column Agrregations, Elemination of Duplicates

performing Entire Column aggregation.

ex:
   select sum(sal) from emp;

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class SalMap extends Mapper<LongWritable,Text,Text,IntWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   {
      String line = v.toString();
      int sal = Integer.parseInt(
                  line.split(",")[2]
                                   );
      con.write( new Text("Ibm"), new IntWritable(sal));
   }
}
------------


package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(SalMap.class);
  j.setReducerClass(RedForSum.class);
 //j.setSortComparatorClass(SortComparator.class);
  j.setOutputKeyClass(Text.class);
  j.setOutputValueClass(IntWritable.class);

 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}


[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp mrlab/today1

[training@localhost ~]$ hadoop fs -cat mrlab/today1/part-r-00000
Ibm     88000

--------------------------------------------------

Eleminating duplicate rows ..based on entire row match.

[training@localhost ~]$ cat > profiles
101,aaa
102,bbb
101,aaa
101,aaa
101,aaa
102,bbb
103,ccc
101,xxx
101,aaa
[training@localhost ~]$ hadoop fs -copyFromLocal profiles mrlab
[training@localhost ~]$


package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class NoDupeRowsMap extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   {
      con.write( v , NullWritable.get());
   }
}







---------
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RedForNoDupeRows  extends Reducer<Text,NullWritable,Text,NullWritable>
{
    public void reduce(Text k,Iterable<NullWritable> vlist, Context con)
    throws IOException, InterruptedException
    {
        con.write(k, NullWritable.get());
    }

}

----
package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(NoDupeRowsMap.class);
  j.setReducerClass(RedForNoDupeRows.class);
 //j.setSortComparatorClass(SortComparator.class);
  j.setOutputKeyClass(Text.class);
  j.setOutputValueClass(NullWritable.class);

 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}




[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/profiles mrlab/today3



[training@localhost ~]$ hadoop fs -cat mrlab/today3/part-r-00000
101,aaa
101,xxx
102,bbb
103,ccc

---------------------------

eleminating duplicates based on some column match.



[training@localhost ~]$ hadoop fs -copyFromLocal profiles mrlab/profiles2
[training@localhost ~]$ hadoop fs -cat mrlab/profiles2
101,aaa
102,bbb
101,aaa
101,aaa
101,aaa
102,bbb
103,ccc
101,xxx
101,aaa
101,abc
101,bbc
102,def
[training@localhost ~]$



package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class NoDupeKeysMap extends Mapper<LongWritable,Text,Text,Text>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   {
      String line = v.toString();
      String[] w = line.split(",");
      String id = w[0];
      con.write( new Text(id) , v);
   }
}

------------

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RedForNoDupeKeys  extends Reducer<Text,Text,Text,NullWritable>
{
    public void reduce(Text k,Iterable<Text> vlist, Context con)
    throws IOException, InterruptedException
    {
      for(Text line : vlist)
      {
          con.write(line,NullWritable.get());
          break;
      }
    }

}

----------

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(NoDupeKeysMap.class);
  j.setReducerClass(RedForNoDupeKeys.class);
 //j.setSortComparatorClass(SortComparator.class);
  j.setOutputKeyClass(Text.class);
  j.setOutputValueClass(Text.class);

 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}





[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/profiles2  mrlab/today4


[training@localhost ~]$ hadoop fs -cat mrlab/today4/part-r-00000
101,aaa
102,bbb
103,ccc

------------
to get Last duplicate, do following change in reducer.

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RedForNoDupeKeys  extends Reducer<Text,Text,Text,NullWritable>
{
    public void reduce(Text k,Iterable<Text> vlist, Context con)
    throws IOException, InterruptedException
    {
        String line="";
      for(Text ln : vlist)
      {
          line = ln.toString();
 
      }
      con.write(new Text(line), NullWritable.get());
     
    }

}
--------


[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/profiles2  mrlab/today5


[training@localhost ~]$ hadoop fs -cat mrlab/today5/part-r-00000
101,bbc
102,def
103,ccc

-------------------------------