Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Tuesday 1 August 2017

Hive(10AmTo1:00Pm) Lab1 notes : Hive Inner and External Tables

hive> create table samp1(line string);
-- here we did not select any database.
   default database in hive is "default".
 
  the hdfs location of default database is
   /user/hive/warehouse

 -- when you create a table in default database, under warehouse location, one directory will be created with table name.

   in  hdfs,
   /user/hive/warehouse/samp1 directory is created.

hive> create database mydb;

  when a database is created, in warehouse location, with name database and extension ".db" , one directory will be created.

 How to select database:

hive> use mydb;

hive> create table test1(line string);

  under mydb.db directory, test1 table directory will be created.

 /user/hive/warehouse/mydb.db/test1.

[cloudera@quickstart ~]$ ls file*
file1  file2  file3
[cloudera@quickstart ~]$ cat file1
aaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
[cloudera@quickstart ~]$ cat file2
bbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbb
[cloudera@quickstart ~]$ cat file3
cccccccccccccccccccc
ccccccccccccccccccc
ccccccccccccc
[cloudera@quickstart ~]$

hive> use default;
hive> load data local inpath 'file1'
      into table samp1;
-- when you  load file into table,
    the file will be copied into table's backend directory.

   in hdfs,
  /user/hive/warehouse/samp1/file1

hive> load data local inpath 'file2'
   into table samp1;

  now table directory has two files,
     file1 and file2.
hive> select * from samp1;
o/p:
aaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbb


-- hive will read all rows of all files of
   table directory.

another way of loading file into table.

$ hadoop fs -copyFromLocal file3
          /user/hive/warehouse/samp1



hive> select * from samp1;
OK
aaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbb
cccccccccccccccccccc
ccccccccccccccccccc
ccccccccccccc

hive> use mydb;
hive> show tables;
test1
hive> load data local inpath 'file1' into table test1;
hive>
in hdfs,
 /user/hive/warehouse/mydb.db/test1/file1

===============================

  Hive tables are basically two types.

 1) Inner tables [user managed tables]
 2) External tables.

 when inner table is dropped,
   both metadata and data(from hdfs) will be deleted.

 when external table is dropped ,
   only metadata will be deleted,
   but still data is safely available in hdfs table's backend location.

  so that you can reuse data in future.


where hive tables metadata will be stored.
  -- in rdbms,
    under  metastore database.

 when you submit a query in hive,
   hive will contact metastore, and indentify table's backend hdfs location, and reads data.

by default every table is inner table. [managed table].

 to create external table.

hive> create external table etab1(line string);

hive>load data local inpath 'file1'
      into table etab1;

hive> load data local inpath 'file2'
    into table etab1;

 now etab1 is created under mydb database,
  under etab1 table directory we have 3 files.

   these file locations will be updated hive metastore(in rdbms).

 when this table is dropped from hive..

hive> drop table etab1;

  -- from rdbms , metadata of this table will be deleted.
 -- but still in hdfs, the table directory and its files are available.
  [ data is not lost]

 so that , in future, hive or other ecosystem can use this data. [adv: reusability]

How to reuse it.
----------------

hive> use mydb;
hive> create table etab1(line string);
hive> select * from etab1;
aaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbb

-- when you create etab1,
  in hdfs , under database location,
  one directory should be created .
  but  under /user/hive/warehouse/mydb.db,
   already etab1 is existed with two files.

 if directory existed, hive will use it,
   if not existed, hive will create it.

============

hive> drop table etab1;
 -- both data and metadata will be deleted.
  bcoz, this time we create etab1 as "inner" table.


=================

Both inner and external tables can use
   custom hdfs locations.

hive> create table mytab(line string)
      location '/user/mydata';

 in hdfs,
   /user/mydata directory will be created
hive> load data local inpath 'file1'
      into table mytab;

  now file1 will be copied into /user/mydata.


hive> drop table mytab;
 here mytab is created as inner table.
  so both metadata and data (/user/mydata)
   will be deleted

hive> create external table urtab(line string)
   location '/user/urdata';

 now in hdfs,
   /user/urdata  directory will be created.


hive> load data local inpath 'file1'
     into table urtab;

hive> load data local inpath 'file2'
     into table urtab;

hive> load data local inpath 'file3'
     into table urtab;

now file1,2,3 will be copied into
   /user/urdata directory of hdfs.

hive> drop table urtab;
 -- only metadata from rdbms will be deleted.  still /user/urdata directory is available with file1,2,3 files.


Reusing next time:

hive> create table ourtab(line string)
     location '/user/urdata';

  here /user/urdata is already existed in hdfs. so hive will use it. if not existed hive will create it.

=====================================





















   






















   



















 


Wednesday 19 July 2017

Pig Video Lessons

Pig class Links:
PigLab1 Video:
https://drive.google.com/file/d/0B6ZYkhJgGD6XTzVHbzBYUFY0a1k/view?usp=sharing

PigLab Notes1:
https://drive.google.com/file/d/0B6ZYkhJgGD6XeU9tUF9aS3QxUWc/view?usp=sharing

PigLab2 Video:
https://drive.google.com/file/d/0B6ZYkhJgGD6XNnhvZUN5eTJSaHM/view?usp=sharing

PigLab2 Notes:
https://drive.google.com/file/d/0B6ZYkhJgGD6Xd0ZHb1hWZVhjbmc/view?usp=sharing

PigLab3 Video:
https://drive.google.com/file/d/0B6ZYkhJgGD6XY3ZTWFFZZ3VMcnM/view?usp=sharing

PigLab3 Notes:
https://drive.google.com/file/d/0B6ZYkhJgGD6Xb1k1aklZOXdjaUE/view?usp=sharing

PigLab4 Video[its audio]:
https://drive.google.com/file/d/0B6ZYkhJgGD6XWHBoODhPTzJ3LVE/view?usp=sharing

PigLab4 Notes :

https://drive.google.com/file/d/0B6ZYkhJgGD6XU0xiSTM0c1VNcGM/view?usp=sharing

PigLab5 Video :
https://drive.google.com/file/d/0B6ZYkhJgGD6XMDFjNmI0ekg1LU0/view?usp=sharing

PigLab5 Notes :
https://drive.google.com/file/d/0B6ZYkhJgGD6XT0tvMGtheGE3anc/view?usp=sharing

PigLab6 Video :
https://drive.google.com/file/d/0B6ZYkhJgGD6XbEdmaG55a3hkRGc/view?usp=sharing


PigLab6 Notes :
https://drive.google.com/file/d/0B6ZYkhJgGD6XOVRUTkt3Y290YXM/view?usp=sharing

PigLab7 video :

https://drive.google.com/file/d/0B6ZYkhJgGD6XY25CUjlrS05QTjg/view?usp=sharing

PigLab7 Notes :

https://drive.google.com/file/d/0B6ZYkhJgGD6XbGlnQ05wbldUbUU/view?usp=sharing

PigLab8 Notes :
https://drive.google.com/file/d/0B6ZYkhJgGD6XV2ZsUGFUV0tsUVk/view?usp=sharing

PigLab8 Notes :

https://drive.google.com/file/d/0B6ZYkhJgGD6XYW1xNEZpNjdTdHc/view?usp=sharing








Thursday 8 June 2017

Hive Partitioned tables [case study]


[cloudera@quickstart ~]$ cat saleshistory
01/01/2011,2000
01/01/2011,3000
01/02/2011,5000
01/02/2011,4000
01/02/2011,1000
01/03/2011,2000
01/25/2011,3000
01/25/2011,5000
01/29/2011,4000
01/29/2011,1000
02/01/2011,2000
02/01/2011,3000
02/02/2011,8000
03/02/2011,9000
03/02/2011,3000
03/03/2011,5000
03/25/2011,7000
03/25/2011,2000
04/29/2011,5000
04/29/2011,3000
05/01/2011,2000
05/01/2011,3000
05/02/2011,5000
05/02/2011,4000
06/02/2011,1000
06/03/2011,2000
06/25/2011,3000
07/25/2011,5000
07/29/2011,4000
07/29/2011,1000
08/01/2011,2000
08/01/2011,3000
08/02/2011,5000
09/02/2011,4000
09/02/2011,1000
09/03/2011,2000
09/25/2011,3000
10/25/2011,5000
10/29/2011,4000
10/29/2011,1000
10/29/2011,5000
11/01/2011,2000
11/01/2011,3000
11/02/2011,5000
11/02/2011,4000
11/02/2011,1000
11/03/2011,2000
11/25/2011,3000
12/25/2011,5000
12/29/2011,4000
12/29/2011,1000
12/30/2011,9000
12/30/2011,40000
[cloudera@quickstart ~]$

create table myraw(dt string, amt int)
      row format delimited
        fields terminated by ',';

load data local inpath 'saleshistory' into table myraw;

create table urraw like myraw;




-----------------

insert overwrite table urraw
   select * from (
      select dt, amt from myraw
            union all
   select concat(substr(dt,1,9),'2') as dt, amt+1000 as amt
          from myraw
            union all
   select concat(substr(dt,1,9),'3') as dt, amt+4000 as amt
          from myraw
            union all
   select concat(substr(dt,1,9),'4') as dt, amt+500 as amt
          from myraw
            union all
   select concat(substr(dt,1,9),'5') as dt, amt+8000 as amt
          from myraw
            union all
   select concat(substr(dt,1,9),'6') as dt, amt+1000 as amt
          from myraw ) s;

-- 01/01/2011     1000


create table ourraw(dt array<string> , amt int);

insert overwrite table ourraw
  select split(dt,'/') , amt  from urraw;

create table sales(dt string, amt int);

insert overwrite table sales
  select concat(dt[2],'-',dt[0],'-',dt[1]), amt from
        ourraw;




create table salespart(dt string, amt int)
   partitioned by (y int, m int, d int)
 row format delimited
   fields terminated by ',';

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.dynamic.partitions.pernode=10000;






insert overwrite table salespart
   partition (y, m, d)
  select dt, amt, year(dt), month(dt), day(dt)
        from sales;

     ------    sankara.deva2016@gmail.com
   -----sub:   partitions case study.

Wednesday 17 May 2017

Pig : Udfs using Python

we can keep multiple functions
  under one program(.py)

 transoform.py
-------------------------
from pig_util  import outputSchema
@outputSchema(name:Chararray)
def  firstUpper(x):
   fc = x[0].upper()
   rc = x[1:].lower()
   n = fc+rc
   return n
@outputSchema(sex:Chararray)
def  gender(x):
   if x=='m':
      x = 'Male'
   else:
      x = 'Female'
   return x

@outputSchema(dname:chararray)
def dept(dno):
   dname="Others"
   if dno==11:
       dname = 'Marketing'
   elif dno==12:
       dname="Hr"
   elif dno==13:
      dname="Finance"
   return dname

-----------------
register  'transform.py' using jython
  as myprog

res = foreach emp generate
     id, myprog.firstUpper(name) as name,
     sal , myprog.gender(sex) as sex,
       myprog.dept(dno) as dname;
-------------------------------------


pig unions :


 data1 = load 'file1' using PigStorage(',')
   as (name:chararray, city:chararray);

 data2 = load 'file2' using PigStorage(',')
   as (name:chararray, sal:int);

d1 = foreach data1 generate
        name, city, null as sal ;
d2 = foreach data2 generate
        name, null as city, sal;

d = union d1, d2;




Tuesday 16 May 2017

Python Examples 1



name = input("Enter name ")
age = input("Enter age")

print(name, " is ", age, " years old ")
-----------------------------------

# if

a = 10
b = 25
if a>b:
  print(a , " is big")
else:
   print(b , " is big ")
-----------------------------
# nested if
a = 10
b = 20
c = 17
big = 0
if a>b:
  if a>c:
    big=a
  else:
    big=c
elif b>c:
  big=b
else:
  big=c
print("Biggest is ", big)
----------------------------------

# if and loop combination:

lst = [10,20,34,23,12,34,23,45]

big = lst[0]

for v in lst:
  if v>big:
     big=v
print("max of the list ", big)

-------------------------

lst = [10,20,34,23,12,34,23,45]

tot = 0
cnt = 0

for v in lst:
   tot += v
   cnt += 1
avg = tot/cnt
print("avgerage : ", avg)
-------------------------

lst = [10,20,34,23,12,34,23,45]

tot = sum(lst)
cnt  = len(lst)
avg = tot/cnt
max = max(lst)
min = min(lst)
print("tot : ", tot)
print("count : ",cnt)
print("average ", avg)
print("maximum ", max )
print("Minimum ", min)
----------------------------

Accessing List Elements:



------------------------------

updating List Elements:


lst[3] = 100
print(lst)
lst[4:7] = [250]*3
print(lst)

a = [1]*10
print(a)
--------------------------

adding elements to list:

lst = [10,20,34,23,12]

lst.append(100)
lst.append(200)
print(lst)

-----------------------------

merging lists:

lst = [10,20,34,23,12]

lst2 = [100,200,300,400,500]

l = lst + lst2

lst += lst2
print(l)
print(lst)

---------------------------

Transformations:

names = ["ravi", "rani", "vani", "venu", "siri"]

names2 = []

for x in names:
    names2.append(x.upper())
print(names2)
------------------------------

Transformations:
  first char in to upper case,
  and all remainings into lower case.

names = ["ravi", "rani", "vani", "venu", "siri"]

names2 = []

for x in names:
     fc = x[0].upper()
     rc = x[1:].lower()
     name=fc+rc
     names2.append(name)
print(names2)
----------------------------------

     sankara.deva2016@gmail.com
-------------------------------------














 
   





Spark : Spark streaming and Kafka Integration

steps:

 1)  start zookeper server
 2)  Start Kafka brokers [ one or more ]
 3)  create topic .
 4)  start console producer [ to write messages into topic ]
 5) start console consumer [ to test , whether messages are stremed ]
 6) create spark streaming context,
    which streams from kafka topic.
 7) perform transformations or aggregations
 8) output operation : which will direct the results into another kafka topic.
------------------------------------------





   

 


following code tested with ,
  spark 1.6.0 and kafka 0.10.2.0

kafka and spark streaming

bin/zookeeper-server-start.sh  config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(sc, Seconds(5))
import org.apache.spark.streaming.kafka.KafkaUtils
//1.
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
val lines = kafkaStream.map(x => x._2.toUpperCase)

val warr = lines.map(x => x.split(" "))
val pair = warr.map(x => (x,1))
val wc = pair.reduceByKey(_+_)

wc.print()
// use below code to write results into kafka topic
ssc.start

------------------------------
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic results1

// writing into kafka topic.

import org.apache.kafka.clients.producer.ProducerConfig
import java.util.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord


wc.foreachRDD(rdd =>
      rdd.foreachPartition(partition =>

                partition.foreach{
                  case t:(w:String,cnt:Long)=>{
                    val x = w+"\t"+cnt                
                    val props = new HashMap[String, Object]()
                    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                            "localhost:9092")
                    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
                    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")

                    println(x)
                    val producer = new KafkaProducer[String,String](props)
                    val message=new ProducerRecord[String, String]("results1",null,x)
                    producer.send(message)
                  }
                }))

-- execute above code before ssc.start.
--------------------------------------------
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic results1 --from-beginning





-------------------
 val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))

1. --? KafkaUtils.createStream()..
   needs 4 arguments.
    1st --->  streaming Context
    2nd --> zk details.
   3rd --- > consumer group id
   4th ----> Topics.
 
spark streaming can read from multiple topics.
    topic should be as  a key value pair of map object

key ---> topic name
value ---> no.of consumer threads.

 to read from multiple topics,
 the 4th argument should be as follows.
    Map("t1"->2,"t2"->4,"t3"->1)

-------------------------

   each given number of consumer threads will applied on each partition of kafka topic.

   ex: topic has 3 threads,
        consumber threads are 5.
   so , total number of threads = 15.

but these  15 theads are not parallely executed.

at shot, 5 threads for one partiton will be parallely consuming data.

to make all (15) parallel.

val numparts = 3
val kstreams = (1 to numparts).map{x =>
    val kafkaStream = KafkaUtils.createStream(ssc,   "localhost:2181","spark-streaming-consumer-   group",     Map("spark-topic" -> 5))
          }


















Pig : UDFs


Pig UDFS
----------

  UDF ---> user defined functions.
 
   adv:
       i)  custom functionalities.
      ii)  reusability.

 Pig UDFs can be developed by
    java
   python
    ruby
    c++
    javascript
    perl

step1:
   Develop udf code.

step2:
   export into jar file
   ex: /home/cloudera/Desktop/pigs.jar

step3:
   register jar file into pig.
 grunt> register Desktop/pigs.jar

step4:
   create temporory function  for udf class.

 grunt> define  ucase pig.analytics.ConvertUpper();

step5:
  calling the function:

 grunt>e =  foreach emp generate
      id, ucase(name) as name, sal,
        ucase(sex) as sex, dno;

 
package  pig.analytics;
import .....

--> ucase(name) ---> upper conversion

public class ConvertUpper extends EvalFunc<String>
  {
     public String exec(Tuple v)
      throws IOException
     {
        String str = (String)v.get(0);
        String res = str.toUpperCase();
        retrun res;
           
     }

 
 }
--------------------------
$ cat > samp
100,230,400
123,100,90
140,560,430

$ hadoop fs -copyFromLocal samp piglab

grunt> s = load 'piglab/samp'
            using PigStorage(',')
          as (a:int, b:int, c:int);



package pig.analytics;
....
public class RMax extends EvalFunc<Integer>
{
    public  Integer exec(Tuple v)
     throws IOException
    {
      int a =(Integer) v.get(0);
      int b =(Integer) v.get(1);
      int c =(Integer) v.get(2);

      int big = a; // 10,20,3
      if (a>big) big = a;
      if (b>big) big = b;
      if (c>big) big = c;
      return  big;
    }
 }

export into jar : Desktop/pigs.jar

grunt> register Desktop/pigs.jar;

grunt> define rmax pig.analytics.RMax();

grunt> res = foreach s generate *,
                   rmax(*) as max;

--------------------------------

 package pig.analytics;
 .......
 public class RowMax
   extends EvalFunc<Integer>
 {
    public Integer exec(Tuple v) throws IOException
    {
     List<Object> lobs = v.getAll() ;
     int max = 0;
     int cnt =0;
    // -20,-3,-40
     for(Object o : lobs)
     {
       cnt++;
       int val = (Integer)o;
       if(cnt==1) max = val;
       max = Math.max(max, val);
     }
     return max;
    }
 }

export in to jar : Desktop/pigs.jar
grunt> register Desktop/pigs.jar
grunt> define dynmax pig.analytics.RowMax();
grunt> r = foreach s generate *, dynmax(*) as m;
-----------------------------------------

emp = load 'piglab/emp' using PigStorage(',')
   as (id:int, name:chararray, sal:int,
     sex:chararray, dno:int);

grade()
dname()
gender()


package pig.analytics;
public class Gender extends EvalFunc<String>
{
 public String exec(Tuple v) throws IOException
 {
     String s =(String) v.get(0);
     s = s.toUpperCase();
     if (s.matches("F"))
       s = "Female";
     else
       s = "Male";
     return s;
 }
}
-----------------

package pig.analytics;
public class Grade extends EvalFunc<String>
{
 public String exec(Tuple v) throws IOException
 {
     String sal =(Integer) v.get(0);
     String grade;
     if (sal>=70000)
       grade="A";
     else if (sal>=50000)
          grade="B";
         else if (sal>=30000)
               grade="C";
              else
               grade="D";
     return grade;
 }
}
------
package pig.analytics;
public class DeptName extends EvalFunc<String>
{
 public String exec(Tuple v) throws IOException
 {
    int dno = (Integer)v.get(0);
    String dname;
    switch (dno){
    case 11 :
          dname = "Marketing";
          break;
    case 12 :
          dname = "HR";
          break;
    case 13 :
          dname = "Finance";
          break;
    default:
          dname = "Others";
     }
    return dname;  
 }
}
---------------------------------

---------------------------
export into jar : Desktop/pigs.jar;
grunt> register Desktop/pigs.jar;
grunt> define gender pig.analytics.Gender();
grunt> define grade pig.analytics.Grade();
grunt> define dept pig.analytics.DeptName();

grunt> res = foreach emp generate
    id, ucase(name) as name,
     sal, grade(sal) as grade,
    gender(sex) as sex,
    dept(dno) as dname ;
---------------------------------











     
         







Pig : Cross Operator to Cartisian


 Cross:
 -----
   used cartisian product.

   each element of left set, joins with each element of right set.


  ds1 --> (a)
          (b)
          (c)

  ds2 --> (1)
          (2)

  x = cross ds1, ds3

   (a,1)
   (a,2)
   (b,1)
   (b,2)
   (c,1)
   (c,2)


emp = load 'piglab/emp' using PigStorage(',')
    as (id:int, name:chararray, sal:int,
  sex:chararray, dno:int);


task:
   find how many employees are below to the avg sal,
  and above to the avg sal;

sals = foreach emp generate sal;

grp = group sals all;

avgsal = foreach grp generate AVG(sals.sal) as avg;
avgsal = foreach avgsal generate (int)avg;


e = cross sals, avgsal;
e = foreach e generate sals::sal as sal, avgsal::avg as avg;


stats = foreach e generate
       (sal>=avg ? 'Above':'Below') as stat;
grp2 = group stats by stat;
res = foreach grp2 generate group as stat,
     COUNT(stats) as cnt;
dump res
(Above,2)
(Below,6)


--in above task, cross is used to , make available of avgsal to each row of the emplyee,
  so that we can compare each sal with avg.

--------------------------------------

2nd example:


[cloudera@quickstart ~]$ cat sales
01/01/2016,40000
01/03/2016,50000
01/25/2016,50000
02/01/2016,40000
02/03/2016,90000
02/25/2016,50000
03/01/2016,40000
03/03/2016,50000
04/25/2016,50000
05/01/2016,40000
05/03/2016,50000
06/25/2016,50000
06/01/2016,40000
06/03/2016,90000
06/25/2016,50000
07/01/2016,40000
07/03/2016,50000
07/25/2016,50000
08/01/2016,40000
09/03/2016,50000
09/25/2016,50000
10/01/2016,40000
10/03/2016,90000
10/25/2016,50000
10/01/2016,40000
11/03/2016,50000
12/25/2016,50000
12/01/2016,40000
12/03/2016,50000
12/25/2016,50000
12/01/2016,40000
12/03/2016,90000
12/25/2016,50000
12/01/2016,40000
12/03/2016,50000
12/25/2016,50000
[cloudera@quickstart ~]$

[cloudera@quickstart ~]$ hadoop fs -copyFromLocal sales piglab

monthly sales report.

sales = load 'piglab/sales'
   using PigStorage(',')
    as (dt:chararray, amt:int);

sales2 = foreach sales generate
     SUBSTRING(dt,0,2) as mon, amt;
s = foreach sales2 generate (int)mon, amt;

grp = group s by mon;
mrep = foreach grp generate group as mon,
              SUM(s.amt) as tot;

--quarterly sales report:

q = foreach mrep generate
       (mon <4 ? 1:
         (mon <7 ? 2:
           (mon <10 ? 3:4))) as qtr, tot;
qgrp = group q by qtr;
qrep = foreach qgrp generate
         group as qtr, SUM(q.tot) as tot;
dump qrep
(1,410000)
(2,370000)
(3,280000)
(4,780000)


qrep2 = foreach qrep generate *;

cr = cross qrep, qrep2;
cr = foreach cr generate
      qrep::qtr as q1, qrep2::qtr as q2,
      qrep::tot as tot1, qrep2::tot as tot2;

cr2 = filter cr by (q1-q2)==1;

rep = foreach cr2 generate *,
       ((tot1-tot2)*100)/tot2 as pgrowth;

dump rep;
--------------------------------

[cloudera@quickstart ~]$ cat matri
101,Amar,25,40000,hyd,m
102,Amala,23,50000,Del,f
103,Kiran,29,50000,hyd,m
104,Samantha,26,30000,hyd,f
105,Mani,30,70000,Del,m
106,Rakhul,24,40000,Del,f
107,Venu,34,100000,Pune,m
108,Ileana,29,200000,hyd,f

[cloudera@quickstart ~]$

hadoop fs -copyFromLocal matri piglab

applicants = load 'piglab/matri'
    using PigStorage(',')
    as (id:int, name:chararray,
       age:int, income:int, city:chararray,
        sex:chararray);

males = filter applicants by sex=='m';
fems = filter applicants by sex=='f';

mf = cross males, fems;

mf = foreach mf generate males::name as mname,
           fems::name as fname,
         males::age as mage,
         fems::age as fage;

res1 = filter mf by (mage>fage and mage-fage<4);

mlist = foreach res1 generate
           mname as src , fname as trg,
           mage as srcage, fage as trgage;
flist = foreach res1 generate
          fname as src, mname as trg,
           fage as srcage, mage as trgage;

list = union mlist, flist;




dump res1;









   





     

















Pig : Joins

[cloudera@quickstart ~]$ hadoop fs -cat spLab/e
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
109,jj,10000,m,14
110,kkk,20000,f,15
111,dddd,30000,m,15
[cloudera@quickstart ~]$ hadoop fs -cat spLab/d
11,marketing,hyd
12,hr,del
13,fin,del
21,admin,hyd
22,production,del
[cloudera@quickstart ~]$
$ cat > joins.pig

 emp = load 'spLab/e' using PigStorage(',')
    as (id:int, name:chararray, sal:int,
       sex:chararray, dno:int);

 dept = load 'spLab/d' using PigStorage(',')
   as (dno:int, dname:chararray, dloc:chararray);

 ij = join  emp by dno, dept by dno;

 lj = join emp by dno left outer, dept by dno;

 rj = join emp by dno right outer, dept by dno;

 fj = join emp by dno full outer, dept by dno;


cntr+d [to save]

grunt> run joins.pig
runt> describe ij
ij: {emp::id: int,emp::name: chararray,emp::sal: int,emp::sex: chararray,emp::dno: int,dept::dno: int,dept::dname: chararray,dept::dloc: chararray}
grunt> describe fj
fj: {emp::id: int,emp::name: chararray,emp::sal: int,emp::sex: chararray,emp::dno: int,dept::dno: int,dept::dname: chararray,dept::dloc: chararray}
grunt> ed = foreach fj generate
>>    emp::id as id, emp::name as name, emp::sal as sal,
>>     emp::sex as sex, dept::dname as dname, dept::dloc as city;
-- neutralizing schema. omitting reference operators.

grunt> dump ed
(108,iiii,50000,m,marketing,hyd)
(101,aaaa,40000,m,marketing,hyd)
(106,dkd,40000,m,hr,del)
(105,ee,10000,m,hr,del)
(103,cccc,50000,m,hr,del)
(102,bbbbbb,50000,f,hr,del)
(107,sdkfj,80000,f,fin,del)
(104,dd,90000,f,fin,del)
(109,jj,10000,m,,)
(111,dddd,30000,m,,)
(110,kkk,20000,f,,)
(,,,,admin,hyd)
(,,,,production,del)

-----------------------------

grunt> ed1 = foreach fj generate
>>       emp::dno as dno1, dept::dno as dno2, emp::sal as sal;
grunt> describe ed1
ed1: {dno1: int,dno2: int,sal: int}
grunt> stats = foreach ed1 generate
>>   (dno1 is not null and dno2 is not null ? 'Working' :
>>     (dno2 is null ? 'BenchTeam' : 'BenchProject')) as stat, sal;
grunt> grp = group stats by stat;
grunt> res = foreach grp generate
>>     group as stat, SUM(stats.sal) as tot;
grunt> r = foreach res generate stat,
>>             (tot is null ? 0:tot) as tot;

grunt> dump r;
(Working,410000)
(BenchTeam,60000)
(BenchProject,0)


-----------------------------------------

to get top 3 salaries.


 sals = foreach emp generate sal;
 sals2 = distinct sals;
 sals3 = order sals2 by sal desc;
 sals4 = limit sals3 3;

 e = join emp by sal, sals4 by sal;
 etop3 = foreach e generate
     emp::id as id, emp::name as name,
        emp::sal as sal;
 dump etop3;

-------------------------------

 emp ---> id name sal sex dno
 dept --> dno dname dloc mid
 mngrs ---> mid  mname  phone


  what is total salary budjet for each manager.


[cloudera@quickstart ~]$ hadoop fs -copyFromLocal dept spLab/dd
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal mngrs spLab
[cloudera@quickstart ~]$

dep = load 'spLab/dd' using PigStorage(',')
   as (dno:int, dname:chararray, dloc:chararray,
  mid:chararray);

mgr = load 'spLab/mngrs' using PigStorage(',')
   as (mid:chararray, mname:chararray,
       ph:chararray);

edep = join emp by dno, dep by dno;

edep2 = foreach edep generate
         emp::sal as sal, dep::mid as mid;

edm = join edep2 by mid, mgr by mid;
edm2 = foreach edm generate
        mgr::mname as mname ,
          edep2::sal as sal;

grpx = group edm2 by mname;
resx = foreach grpx generate
       group as mname,
          SUM(edm2.sal) as tot;
dump resx
------------------------------------






 








 




















Pig : Order [ Sorting ] , exec, run , pig


 order :-
   to sort data (tuples) in ascending or descending order.

 emp = load 'piglab/emp'
     using PigStorage(',')
     as (id:int, name:chararray,
    sal:int, sex:chararray, dno:int);

 e1 = order emp by name;
 e2 = order emp by sal desc;
 e3 = order emp by sal desc, sex, dno desc;

 ---------------------------------------
sql:
   select * from emp order by sal desc limit 3;

 e = order emp by sal desc;
 top3 = limit e 3;


limitation:

 101,aaa,30000,.....
 102,bbb,90000,....
 103,cccc,90000,....
 104,dd,90000,....
 105,ee,80000,.....

 above process is correct,
  if there are no duplicated salaries.





--------------------------
 sql:
   select dno, sum(sal) as tot from emp
     group by dno
     order by tot desc
     limit 3;


 e =  foreach emp generate dno, sal;
 grp = group e by dno;
 res = foreach grp generate
         group as dno, SUM(e.sal) as tot;
 sorted = order res by tot desc;
 top3 = limit sorted 3;

limitation:

 dump res
 11,2l
 12,3l
 13,3l
 14,3l
 15,3l
 16,0.5l

 above process is correct,
  if there are no , duplicated sums for each dno.
solution to above two problems.
 i)udf
 ii)joins.

-------------------------------------

 executing scripts in pig.
--------------------------
 3 commands :
 i)pig
 ii) exec
 iii) run

Pig:
  to execute script file from command prompt.
  alias of reltions are not available with
   grunt shell.

 [cloudera@quickstart ~]$ pig  script1.pig

exec:
  to execute script from grunt shell.
 still aliases of relations are not available.
 so we can not reuse them.

grunt> exec script1.pig

run:
 
 to execute script from grunt shell.
 aliases of relations are available in grunt.
 so that you can reuse them.

--------------------------------------

environment availability
                            of aliases in grunt

 pig command prompt no

 exec grunt no

 run grunt yes
-----------------------------------------

when to use what.
 
  to deploy scripts from shell script or oozie,
  use Pig.

  to execute from grunt, and not to override previous alias of existed flows.
   use "Exec".

 to execute from grunt, and to reuse them
  use "run".

---------------------------------






























Monday 8 May 2017

Spark : Joins 2

Denormalizing datasets using Joins
[cloudera@quickstart ~]$ cat > children
c101,p101,Ravi,34
c102,p101,Rani,24
c103,p102,Mani,20
c104,p103,Giri,22
c105,p102,Vani,22
[cloudera@quickstart ~]$ cat > parents
p101,madhu,madhavi,hyd
p102,Sathya,Veni,Del
p103,Varma,Varuna,hyd
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal children spLab
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal parents spLab
[cloudera@quickstart ~]$
val children = sc.textFile("/user/cloudera/spLab/children")
val parents = sc.textFile("/user/cloudera/spLab/parents")
val chPair = children.map{ x =>
        val w = x.split(",")
        val pid = w(1)
val chInfo =Array(w(0), w(2), w(3)).
                      mkString(",")
       (pid, chInfo)
   }
chPair.collect.foreach(println)
(p101,c101,Ravi,34)
(p101,c102,Rani,24)
(p102,c103,Mani,20)
(p103,c104,Giri,22)
(p102,c105,Vani,22)
val PPair = parents.map{ x =>
           val w = x.split(",")
           val pid = w(0)
 val pInfo = Array(w(1),w(2),w(3)).mkString(",")
     (pid, pInfo)
   }
PPair.collect.foreach(println)
         
PPair.collect.foreach(println)
(p101,madhu,madhavi,hyd)
(p102,Sathya,Veni,Del)
(p103,Varma,Varuna,hyd)
val family = chPair.join(PPair)
family.collect.foreach(println)
(p101,(c101,Ravi,34,madhu,madhavi,hyd))
(p101,(c102,Rani,24,madhu,madhavi,hyd))
(p102,(c103,Mani,20,Sathya,Veni,Del))
(p102,(c105,Vani,22,Sathya,Veni,Del))
(p103,(c104,Giri,22,Varma,Varuna,hyd))
val profiles = family.map{ x =>
         val cinfo = x._2._1
         val pinfo = x._2._2
      val info = cinfo +","+ pinfo
       info
 }
profiles.collect.foreach(println)
c101,Ravi,34,madhu,madhavi,hyd
c102,Rani,24,madhu,madhavi,hyd
c103,Mani,20,Sathya,Veni,Del
c105,Vani,22,Sathya,Veni,Del
c104,Giri,22,Varma,Varuna,hyd


profiles.saveAsTextFile("/user/cloudera/spLab/profiles")
[cloudera@quickstart ~]$ hadoop fs -ls spLab/profiles
Found 2 items
-rw-r--r--   1 cloudera cloudera          0 2017-05-08 21:02 spLab/profiles/_SUCCESS
-rw-r--r--   1 cloudera cloudera        150 2017-05-08 21:02 spLab/profiles/part-00000
[cloudera@quickstart ~]$ hadoop fs -cat spLab/profiles/part-00000
c101,Ravi,34,madhu,madhavi,hyd
c102,Rani,24,madhu,madhavi,hyd
c103,Mani,20,Sathya,Veni,Del
c105,Vani,22,Sathya,Veni,Del
c104,Giri,22,Varma,Varuna,hyd
[cloudera@quickstart ~]$



Spark : Joins


[cloudera@quickstart ~]$ hadoop fs -copyFromLocal emp spLab/e
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal dept spLab/d
[cloudera@quickstart ~]$ hadoop fs -cat spLab/e
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
109,jj,10000,m,14
110,kkk,20000,f,15
111,dddd,30000,m,15
[cloudera@quickstart ~]$ hadoop fs -cat spLab/d
11,marketing,hyd
12,hr,del
13,fin,del
21,admin,hyd
22,production,del
[cloudera@quickstart ~]$
val emp = sc.textFile("/user/cloudera/spLab/e")
val dept = sc.textFile("/user/cloudera/spLab/d")
val epair = emp.map{x =>
               val w = x.split(",")
                val dno = w(4).toInt
                val sal = w(2).toInt
                (dno, sal)
             }
 epair.collect.foreach(println)
(11,40000)
(12,50000)
(12,50000)
(13,90000)
(12,10000)
(12,40000)
(13,80000)
(11,50000)
(14,10000)
val dpair = dept.map{ x =>
            val w = x.split(",")
            val dno = w(0).toInt
             val loc = w(2)
            (dno, loc)
            }
scala> dpair.collect.foreach(println)
(11,hyd)
(12,del)
(13,del)
(21,hyd)
(22,del)
-- inner join
val ij = epair.join(dpair)
ij.collect.foreach(println)
 ij.collect.foreach(println)
(13,(90000,del))
(13,(80000,del))
(11,(40000,hyd))
(11,(50000,hyd))
(12,(50000,del))
(12,(50000,del))
(12,(10000,del))
(12,(40000,del))
-- left outer join
val lj = epair.leftOuterJoin(dpair)
lj.collect.foreach(println)
scala> lj.collect.foreach(println)
(13,(90000,Some(del)))
(13,(80000,Some(del)))
(15,(20000,None))
(15,(30000,None))
(11,(40000,Some(hyd)))
(11,(50000,Some(hyd)))
(14,(10000,None))
(12,(50000,Some(del)))
(12,(50000,Some(del)))
(12,(10000,Some(del)))
(12,(40000,Some(del)))
-- right outer join
val rj = epair.rightOuterJoin(dpair)
rj.collect.foreach(println)
(13,(Some(90000),del))
(13,(Some(80000),del))
(21,(None,hyd))
(22,(None,del))
(11,(Some(40000),hyd))
(11,(Some(50000),hyd))
(12,(Some(50000),del))
(12,(Some(50000),del))
(12,(Some(10000),del))
(12,(Some(40000),del))
-- full outer join
val fj = epair.fullOuterJoin(dpair)
fj.collect.foreach(println)
(13,(Some(90000),Some(del)))
(13,(Some(80000),Some(del)))
(15,(Some(20000),None))
(15,(Some(30000),None))
(21,(None,Some(hyd)))
(22,(None,Some(del)))
(11,(Some(40000),Some(hyd)))
(11,(Some(50000),Some(hyd)))
(14,(Some(10000),None))
(12,(Some(50000),Some(del)))
(12,(Some(50000),Some(del)))
(12,(Some(10000),Some(del)))
(12,(Some(40000),Some(del)))

location based aggregations:
val locSal = fj.map{ x =>
       val sal = x._2._1
       val loc = x._2._2
       val s = if(sal==None) 0 else sal.get
   val l = if(loc==None) "NoCity" else loc.get
       (l, s)
 }
locSal.collect.foreach(println)
(del,90000)
(del,80000)
(NoCity,20000)
(NoCity,30000)
(hyd,0)
(del,0)
(hyd,40000)
(hyd,50000)
(NoCity,10000)
(del,50000)
(del,50000)
(del,10000)
(del,40000)
val locSummary = locSal.reduceByKey(_+_)
locSummary.collect.foreach(println)
scala> locSummary.collect.foreach(println)
(hyd,90000)
(del,320000)
(NoCity,60000)
-----------------
val stats = fj.map{ x =>
       val sal = x._2._1
       val loc = x._2._2
val stat = if(sal!=None & loc!=None)  "Working" else
     if(sal==None) "BenchProj" else "BenchTeam"
       val s = if(sal==None) 0 else sal.get
      (stat, s)
 }
stats.collect.foreach(println)
(Working,90000)
(Working,80000)
(BenchTeam,20000)
(BenchTeam,30000)
(BenchProj,0)
(BenchProj,0)
(Working,40000)
(Working,50000)
(BenchTeam,10000)
(Working,50000)
(Working,50000)
(Working,10000)
(Working,40000)
val res = stats.reduceByKey(_+_)
res.collect.foreach(println)
(BenchTeam,60000)
(Working,410000)
(BenchProj,0)

     
















Pig : Foreach Operator


Foreach Operator:
-------------------
grunt> emp = load 'piglab/emp' using PigStorage
(',')
>>  as (id:int, name:chararray, sal:int,
>>     sex:chararray, dno:int);
i) to copy data from one relation to
   another relation.
 emp2 = emp
 or
 emp2 = foreach emp generate *;
-------------------------------------
ii) selecting wanted fields;
grunt> e2 = foreach emp generate name,sal,dno;
grunt> describe e2
e2: {name: chararray,sal: int,dno: int}
grunt>
iii) changing field order:
grunt> describe emp;
emp: {id: int,name: chararray,sal: int,sex:
chararray,dno: int}
grunt> e3 = foreach emp generate
id,name,dno,sex,sal;
grunt> describe e3;
e3: {id: int,name: chararray,dno: int,sex:
chararray,sal: int}
grunt> illustrate e3
------------------------------------------------
---------------------------------------------
| emp     | id:int     | name:chararray     |
sal:int     | sex:chararray     | dno:int     |
------------------------------------------------
---------------------------------------------
|         | 101        | aaaa               |
40000       | m                 | 11          |
------------------------------------------------
---------------------------------------------
------------------------------------------------
--------------------------------------------
| e3     | id:int     | name:chararray     |
dno:int     | sex:chararray     | sal:int     |
------------------------------------------------
--------------------------------------------
|        | 101        | aaaa               | 11
        | m                 | 40000       |
------------------------------------------------
--------------------------------------------
grunt>
iv) generating new fields.
grunt> e4 = foreach emp generate * ,
>>        sal*0.1 as tax, sal*0.2 as hra,
   sal+hra-tax as net;
 --above statement will be failed, bcoz, new
field aliases can not be reused in same
statement.
solution:
grunt> e4 = foreach emp generate *,
>>     sal*0.1 as tax, sal*0.2 as hra;
grunt> e4 = foreach e4 generate *,
                     sal+hra-tax as net;
grunt> dump e4
v) converting data types:
  [ explicit casting ]
runt> e5 = foreach e4 generate
>>        id,name,sal, (int)tax, (int)hra,
(int)net, sex, dno;
grunt> describe e5
e5: {id: int,name: chararray,sal: int,tax:
int,hra: int,net: int,sex: chararray,dno: int}
grunt>
vi) renaming fields.
grunt> describe emp
emp: {id: int,name: chararray,sal: int,sex:
chararray,dno: int}
grunt> e6 = foreach emp generate
>>    id as ecode, name , sal as income, sex as
gender, dno;
grunt> describe e6
e6: {ecode: int,name: chararray,income:
int,gender: chararray,dno: int}
grunt>
vii) conditional transformations:
----------------------------------
[cloudera@quickstart ~]$ cat > test1
100 200
300 120
400 220
300 500
10 90
10 5
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal test1 piglab
[cloudera@quickstart ~]$
grunt> r1 = load 'piglab/test1' as
            (a:int, b:int);
grunt> r2 = foreach r1 generate *,
          (a>b ?  a:b) as big;
grunt> dump r2
(100,200,200)
(300,120,300)
(400,220,400)
(300,500,500)
(10,90,90)
(10,5,10)
grunt>

 tenary operator(conditional operator) is used
for transformations(conditional).
 syntax:
  (criteria  ?  TrueValue : FalseValue)


--nested conditions
grunt> cat piglab/samp1
100 200 300
400 500 900
100 120 23
123 900 800
grunt> s1 = load 'piglab/samp1'
       as (a:int, b:int, c:int);
grunt> s2 = foreach s1 generate *,
      (a>b ? (a>c ? a:c): (b>c ? b:c)) as big,
       (a<b ? (a<c ? a:c): (b<c ? b:c))
            as small;
grunt> dump s2
(100,200,300,300,100)
(400,500,900,900,400)
(100,120,23,120,23)
(123,900,800,900,123)
----------------
grunt> describe emp
emp: {id: int,name: chararray,sal: int,sex:
chararray,dno: int}
grunt> e7 = foreach emp generate
    id, name , sal, (sal>=70000 ? 'A':
                     (sal>=50000 ? 'B':
                    (sal>=30000 ? 'C':'D')))
                     as grade,
    (sex=='m'  ? 'Male':'Female') as sex,
    (dno==11 ? 'Marketing':
       (dno==12 ? 'Hr':
           (dno==13 ? 'Finance':'Others')))
               as   dname;
grunt> store e7 into 'piglab/e7'
                 using    PigStorage(',')
grunt> ls piglab/e7
hdfs://quickstart.cloudera:8020/user/cloudera/pi
glab/e7/_SUCCESS<r 1> 0
hdfs://quickstart.cloudera:8020/user/cloudera/pi
glab/e7/part-m-00000<r 1>228
grunt> cat piglab/e7/part-m-00000
101,aaaa,40000,C,Male,Marketing
102,bbbbbb,50000,B,Female,Hr
103,cccc,50000,B,Male,Hr
104,dd,90000,A,Female,Finance
105,ee,10000,D,Male,Hr
106,dkd,40000,C,Male,Hr
107,sdkfj,80000,A,Female,Finance
108,iiii,50000,B,Male,Marketing
grunt>
--------------------------
-- cleaning nulls using conditional
transformations.
[cloudera@quickstart ~]$ cat > damp
100,200,
,300,500
500,,700
500,,
,,700
,800,
1,2,3
10,20,30
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal damp piglab
[cloudera@quickstart ~]$
grunt> d = load 'piglab/damp'
    using PigStorage(',')
    as (a:int, b:int, c:int);
(100,200,)
(,300,500)
(500,,700)
(500,,)
(,,700)
(,800,)
(1,2,3)
(10,20,30)

 d1 = foreach d generate *, a+b+c as tot;
 dump d1
(100,200,,)
(,300,500,)
(500,,700,)
(500,,,)
(,,700,)
(,800,,)
(1,2,3,6)
(10,20,30,60)
grunt> d2 = foreach d generate
     (a is null ? 0:a)  as a,
     (b is null ? 0:b) as b,
     (c is null ? 0:c) as c;
grunt> res = foreach d2 generate *, a+b+c as d;
grunt> dump res
(100,200,0,300)
(0,300,500,800)
(500,0,700,1200)
(500,0,0,500)
(0,0,700,700)
(0,800,0,800)
(1,2,3,6)
(10,20,30,60)
----------------------------------------
foreach:
  -- copy relation to realtion
  -- selecting fields
  -- changing field order
  -- generating new fields.
  -- changing data types.
  -- renaming fields
  -- conditional transformations.
----------------------------------































   












Pig : Subsetting using Filter, Limit, Sample

Techniques of subsetting relations:
 i) filter: used for condiational filtering.
 ii) limit : takes first n number of tuples.
 iii) sample: to take random sample sets.
     " with replace " model.

filter: conditional subsetting.
       
 e1 = filter emp by  sex='m';
 grunt> dump e1
(101,aaaa,40000,m,11)
(103,cccc,50000,m,12)
(105,ee,10000,m,12)
(106,dkd,40000,m,12)
(108,iiii,50000,m,11)


 e2 = filter emp by (sex=='m' and sal>=40000);
 e3 = filter sales by (pid=='p1' or pid=='p4'
            or pid='p20')
 e4 = filter sales by (city!='hyd'   and
        and        city!='del'
            and  city!='pune')
 e5 = filter emp  by  (
          (sex=='m'  and (city=='hyd' or city=='del'))
              or
           (sex=='f'  and (city='blore' or
         city=='chennai'))
              or
           city=='noida'
         )
---------------------------------------
-- to fetch first n number of tuples.
grunt> f3 = limit emp 3;
grunt> dump f3
 -- limitation: can not fetch last n number of
  tuples.
-----------------------------
sample: --> used to get random samples.
  s1 = sample sales 0.1;
  s2 = sample sales 0.5;
----------------------------------























 


Pig : load Operator


Load Operator:
--------------
 to load data from file to relation.
 [cloudera@quickstart ~]$ cat > samp1
100 200 300
400 500 900
100 120 23
123 900 800
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal samp1  piglab
[cloudera@quickstart ~]$

grunt> s1 = load 'piglab/samp1' using PigStorage('\t')
>>          as (a:int, b:int, c:int);
grunt> s2 = load 'piglab/samp1' using PigStorage()
>>          as (a:int, b:int, c:int);
grunt> s3 = load 'piglab/samp1'
>>          as (a:int, b:int, c:int);
grunt> dump s3
(100,200,300)
(400,500,900)
(100,120,23)
(123,900,800)
grunt> dump s2
(100,200,300)
(400,500,900)
(100,120,23)
(123,900,800)
grunt> dump s1
(100,200,300)
(400,500,900)
(100,120,23)
(123,900,800)
outputs of s1, s2 , s3 are same.
 in s2, PigStorage() is with \t delimiter.
 in s3, among PigStorage() and BinStorage()
   PigStorage() is applied by default with \t delimiter.

 the meaning of s1, s2 ,s3  is same.
s4 = load 'piglab/samp1'
      as (a:int, b:int, c:int, d:int)
 first 3 fields of file are mapped with a,b,c fields,
   there is not 4th field in the file,
  so d will become null.
grunt> dump s4
(100,200,300,)
(400,500,900,)
(100,120,23,)
(123,900,800,)
-- following is to skip last fields.
grunt> s5 = load 'piglab/samp1'
>>    as (a:int, b:int)
>> ;
grunt> illustrate s5
--------------------------------
| s5     | a:int    | b:int    |
--------------------------------
|        | 100      | 120      |
--------------------------------
-- but to skip middled fields, take help of foreach operator. [later]
loading  non tab delimited files into PigRelation

[cloudera@quickstart ~]$ cat > samp2
100,10,1
2,200,20
3,30,300
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal samp2 piglab
grunt> ss1 = load 'piglab/samp2' as (a:int, b:int, c:int);
(,,)
(,,)
(,,)
 here load is expecting \t delimiter,
  but file has 0 tabs.
   so entire line  is  one field which is string.
  this has to be mapped with first field of relation , which is a but as int.
  so a became null. file does not have 2 nd, 3 rd fields , thats why b, c fields bacame null.
grunt> ss2 = load 'piglab/samp2'
         as    (a:chararray, b:int, c:int);
(100,10,1,,)
(2,200,20,,)
(3,30,300,,)
grunt> ss3 = load 'piglab/samp2'
        using PigStorage(',')
       as (a:int, b:int, c:int);
grunt> dump ss3
(100,10,1)
(2,200,20)
(3,30,300)
grunt> cat piglab/emp
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
grunt> emp = load 'piglab/emp'
>>     using PigStorage(',')
>>    as (id:int, name:chararray, sal:int, sex:chararray, dno:int);
grunt> illustrate emp
-------------
| emp     | id:int    | name:chararray    | sal:int    | sex:chararray    | dno:int    |
----------------------------------------------------------------------------------------
|         | 104       | dd                | 90000      | f                | 13         |
----------------------------------------------------------------------------------------








































Thursday 4 May 2017

Spark : CoGroup And Handling Empty Compact Buffers



Co Grouping using Spark:-
-------------------------
scala> branch1.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
scala> branch2.collect.foreach(println)
201,Ravi,80000,m,12
202,Varun,90000,m,11
203,Varuna,100000,f,13
204,Vanila,50000,f,12
205,Mani,30000,m,14
206,Manisha,30000,f,14
scala> def toDnoSalPair(line:String)  = {
            val w = line.split(",")
            val dno = w(4).toInt
            val dname = dno match{
              case 11 => "Marketing"
              case 12 => "Hr"
             case 13 => "Finance"
             case _ => "Other"
            }
           val sal = w(2).toInt
          (dname, sal)
      }
toDnoSalPair: (line: String)(String, Int)
scala> toDnoSalPair("101,aaaaa,60000,m,12")
res22: (String, Int) = (Hr,60000)
scala>
scala> val pair1  = branch1.map(x => toDnoSalPair(x))
pair1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[21] at map at <console>:33
scala> val pair2  = branch2.map(x => toDnoSalPair(x))
pair2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:33
scala> pair1.collect.foreach(println)
(Marketing,40000)
(Hr,50000)
(Hr,50000)
(Finance,90000)
(Hr,10000)
(Hr,40000)
(Finance,80000)
(Marketing,50000)
scala> pair2.collect.foreach(println)
(Hr,80000)
(Marketing,90000)
(Finance,100000)
(Hr,50000)
(Other,30000)
(Other,30000)
scala>
scala> val cg = pair1.cogroup(pair2)
cg: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[24] at cogroup at <console>:39
scala> cg.collect.foreach(println)
(Hr,(CompactBuffer(50000, 50000, 10000, 40000),CompactBuffer(80000, 50000)))
(Other,(CompactBuffer(),CompactBuffer(30000, 30000)))
(Marketing,(CompactBuffer(40000, 50000),CompactBuffer(90000)))
(Finance,(CompactBuffer(90000, 80000),CompactBuffer(100000)))
scala>
scala> val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val tot1 = cb1.sum
            val tot2 = cb2.sum
            val tot = tot1+tot2
            (dname,tot1,tot2,tot)
           }
scala> res.collect.foreach(println)
(Hr,150000,130000,280000)
(Other,0,60000,60000)
(Marketing,90000,90000,180000)
(Finance,170000,100000,270000)
from above , sum of empty compact buffer ,
        size of empty compact buffer are zero.
 but we get problem with
    sum/size and  max , min


val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val max1 = cb1.max
            val max2 = cb2.max
            (dname,max1,max2)
           }
-- res.collect , can not execute.
   problem with max on empty compact buffer.
 -- same we get for min.
 val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val tot1 = cb1.sum
            val tot2 = cb2.sum
            val cnt1 = cb1.size
            val cnt2 = cb2.size
             (dname, (tot1,cnt1), (tot2,cnt2))
           }
-- no problem with sum and size on empty compact buffer.
val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val tot1 = cb1.sum
            val tot2 = cb2.sum
            val cnt1 = cb1.size
            val cnt2 = cb2.size
            val avg1 = (tot1/cnt1).toInt
            val avg2 = (tot2/cnt2).toInt
            (dname, avg1, avg2)
           
           }
res.collect  will be failed.
  bcoz, for avg in denominator zero is applied.
Solution:
----------
val res = cg.map{ x =>
            val dname = x._1
            val cb1 = x._2._1
            val cb2 = x._2._2
            val tot1 = cb1.sum
            val tot2 = cb2.sum
            val cnt1 = cb1.size
            val cnt2 = cb2.size
            var max1 = 0
            var min1 = 0
            var avg1 = 0
            if (cnt1!=0){
                  avg1 = tot1/cnt1
                  max1 = cb1.max
                  min1 = cb1.min
            }
            var max2 = 0
            var min2 = 0
            var avg2 = 0
            if (cnt2!=0){
                  avg2 = tot2/cnt2
                  max2 = cb2.max
                  min2 = cb2.min
            }
       (dname,(tot1,cnt1,avg1,max1,min1),
          (tot2,cnt2,avg2,max2,min2))             }
scala> res.collect.foreach(println)
(Hr,(150000,4,37500,50000,10000),(130000,2,65000,80000,50000))
(Other,(0,0,0,0,0),(60000,2,30000,30000,30000))
(Marketing,(90000,2,45000,50000,40000),(90000,1,90000,90000,90000))
(Finance,(170000,2,85000,90000,80000),(100000,1,100000,100000,100000))
-----------------------------
 Cogroup on more than two
scala> val p1 = sc.parallelize(List(("m",10000),("f",30000),("m",50000)))
p1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:27
scala> val p2 = sc.parallelize(List(("m",10000),("f",30000)))
p2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[31] at parallelize at <console>:27
scala> val p3 = sc.parallelize(List(("m",10000),("m",30000)))
p3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:27
scala> val cg = p1.cogroup(p2,p3)
cg: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[34] at cogroup at <console>:33
scala> cg.collect.foreach(println)
(f,(CompactBuffer(30000),CompactBuffer(30000),CompactBuffer()))
(m,(CompactBuffer(10000, 50000),CompactBuffer(10000),CompactBuffer(10000, 30000)))
scala> val r = cg.map{x =>
     |       val sex = x._1
     |       val tot1 = x._2._1.sum
     |       val tot2 = x._2._2.sum
     |       val tot3 = x._2._3.sum
     |       (sex, tot1, tot2, tot3)
     |   }
r: org.apache.spark.rdd.RDD[(String, Int, Int, Int)] = MapPartitionsRDD[35] at map at <console>:37
scala> r.collect.foreach(println)
(f,30000,30000,0)
(m,60000,10000,40000)
scala>


















           







Spark : Union and Distinct



 Unions in spark.
val l1 = List(10,20,30,40,50)
val l2 = List(100,200,300,400,500)
val r1 = sc.parallelize(l1)
val r2 = sc.parallelize(l2)
val r = r1.union(r2)
scala> r.collect.foreach(println)
[Stage 0:>                                                          (0 + 0                                                                          10  
20
30
40
50
100
200
300
400
500
scala> r.count
res1: Long = 10

spark union allows duplicates.
Using ++ operatory, merging can be done.
scala> val r3 = r1 ++ r2
r3: org.apache.spark.rdd.RDD[Int] = UnionRDD[3] at $plus$plus at <console>:35
scala> r3.collect
res4: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500)
scala>
meging more than two sets.
                     ^
scala> val rr = r1.union(r2).union(rx)
rr: org.apache.spark.rdd.RDD[Int] = UnionRDD[6] at union at <console>:37
scala> rr.count
res5: Long = 13
scala> rr.collect
res6: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500, 15, 25, 35)
scala>// or
scala> val rr = r1 ++ r2 ++ rx
rr: org.apache.spark.rdd.RDD[Int] = UnionRDD[8] at $plus$plus at <console>:37
scala> rr.collect
res7: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500, 15, 25, 35)
scala>
--- eleminate duplicates.
scala> val x = List(10,20,30,40,10,10,20)
x: List[Int] = List(10, 20, 30, 40, 10, 10, 20)
scala> x.distinct
res8: List[Int] = List(10, 20, 30, 40)
scala> val y = sc.parallelize(x)
y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:29
scala> r1.collect
res14: Array[Int] = Array(10, 20, 30, 40, 50)
scala> y.collect
res15: Array[Int] = Array(10, 20, 30, 40, 10, 10, 20)
scala> val nodupes = (r1 ++ y).distinct
nodupes: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at distinct at <console>:35
scala> nodupes.collect
[Stage 10:>                                                         (0 + 0                                                                          res16: Array[Int] = Array(30, 50, 40, 20, 10)
scala>
---------------------------------------
[cloudera@quickstart ~]$ hadoop fs -cat spLab/emp
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
[cloudera@quickstart ~]$ hadoop fs -cat spLab/emp2
201,Ravi,80000,m,12
202,Varun,90000,m,11
203,Varuna,100000,f,13
204,Vanila,50000,f,12
205,Mani,30000,m,14
206,Manisha,30000,f,14
[cloudera@quickstart ~]$
scala> val branch1 = sc.textFile("/user/cloudera/spLab/emp")
branch1: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/emp MapPartitionsRDD[15] at textFile at <console>:27
scala> val branch2 = sc.textFile("/user/cloudera/spLab/emp2")
branch2: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/emp2 MapPartitionsRDD[17] at textFile at <console>:27
scala> val emp = branch1.union(branch2)
emp: org.apache.spark.rdd.RDD[String] = UnionRDD[18] at union at <console>:31
scala> emp.collect.foreach(println)
scala> emp.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
201,Ravi,80000,m,12
202,Varun,90000,m,11
203,Varuna,100000,f,13
204,Vanila,50000,f,12
205,Mani,30000,m,14
206,Manisha,30000,f,14
--------------------------------
 distinct:
  to eleminated duplicates
  based on entire row match.
 limitations: can not eleminated based on some  column(s) match.
   for this solution:
     by iterating compactBuffer.
   [ later we will see ]
grouping aggregation on merged set.
scala> val pair = emp.map{ x =>
     |       val w = x.split(",")
     |       val dno = w(4).toInt
     |       val sal = w(2).toInt
     |      (dno, sal)
     | }
pair: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[19] at map at <console>:35
scala> val eres = pair.reduceByKey(_+_)
eres: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[20] at reduceByKey at <console>:37
scala> eres.collect.foreach(println)
(14,60000)
(12,280000)
(13,270000)
(11,180000)
scala>
-- in this output we dont have seperate total for branch1 and branch2.
 for this solution: COGROUP.
    [NEXT DOCUMENT ]



























Pig : CoGroup examples Vs Union Examples


-- co grouping
grunt> cat piglab/emp
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
grunt>
[cloudera@quickstart ~]$ cat > emp2
201,Ravi,80000,m,12
203,Varuna,100000,f,13
204,Vanila,50000,f,12
205,Mani,30000,m,14
206,Manisha,30000,f,14
[cloudera@quickstart ~]$ hadoop fs -
copyFromLocal emp2 piglab
[cloudera@quickstart ~]$
 sql:
   select dno, sum(sal) from (
      select dno, sal from emp1
              union all
      select dno, sal from emp2
       ) e group by dno;
runt> emp1 = load 'piglab/emp' using      
PigStorage(',')
    as (id:int, name:chararray, sal:int,    
sex:chararray,
      dno:int);
grunt> emp2 = load 'piglab/emp2' using
PigStorage(',')
    as (id:int, name:chararray, sal:int,    
sex:chararray,
     dno:int);
grunt> describe emp1
emp1: {id: int,name: chararray,sal: int,sex:
chararray,dno: int}
grunt> describe emp2
emp2: {id: int,name: chararray,sal: int,sex:
chararray,dno: int}
grunt> e1 = foreach emp1 generate dno, sal;
grunt> e2 = foreach emp2 generate dno, sal;
grunt> e = union e1, e2;
grunt> grp = group e by dno;
grunt> res = foreach grp generate group as dno,
               SUM(e.sal) as tot;
grunt> dump res
(11,180000)
(12,280000)
(13,270000)
(14,60000)
 -- in above output,
    we missed clarity,
    total salary for branch1 and brach2
    above is the aggregation of all branches.
 -- we want,
   seperate total salary for branch 1 and
   seperate for branch2.
 -- for this cogroup is used.
  -- cogroup  will construct seperate
     innerbags for each relation(dataset).
   so that seperate aggregations, we can apply.
grunt> describe e1
e1: {dno: int,sal: int}
grunt> describe e2
e2: {dno: int,sal: int}
grunt> cg = cogroup e1 by dno, e2 by dno;
grunt> describe cg
cg: {group: int,e1: {(dno: int,sal: int)},e2:
{(dno: int,sal: int)}}
grunt> dump cg
 -- when cogroup is applied it returns
  n+1 fields .
  where n is number of input relations
   (datasets).
   first one is group
   2nd onwards bags.
(11,{(11,50000),(11,40000)},{(11,90000)})
(12,{(12,40000),(12,10000),(12,50000),
(12,50000)},{(12,50000),(12,80000)})
(13,{(13,80000),(13,90000)},{(13,100000)})
(14,{},{(14,30000),(14,30000)})
grunt> res = foreach cg generate
        group as dno ,
         SUM(e1.sal) as tot1,
         SUM(e2.sal) as tot2;
grunt> describe res
res: {dno: int,tot1: long,tot2: long}
(11,90000,90000)
(12,150000,130000)
(13,170000,100000)
(14,,60000)
---------------------------------
  how to perform,
    seperate aggregations on each dataset
  with out cogrouping.
grunt> describe e1
e1: {dno: int,sal: int}
grunt> describe e2
e2: {dno: int,sal: int}
grunt> ee1 = foreach e1 generate *, 'branch1' as
branch;
grunt> ee2 = foreach e2 generate *, 'branch2' as
branch;
grunt> ee = union ee1 , ee2;
grunt> grp = group ee by (dno, branch);
grunt> res = foreach grp generate
>>         group.dno as dno, group.branch as
branch,
>>              SUM(ee.sal) as tot;
grunt> describe res
res: {dno: int,branch: chararray,tot: long}
grunt> dump res
(11,branch1,90000)
(11,branch2,90000)
(12,branch1,150000)
(12,branch2,130000)
(13,branch1,170000)
(13,branch2,100000)
(14,branch2,60000)
---------------------------------
 using Cogroup , multiple aggregations
  seperately for each dataset.
grunt> describe e1
e1: {dno: int,sal: int}
grunt> describe e2;
e2: {dno: int,sal: int}
grunt> cg = cogroup e1 by dno, e2 by dno;
grunt> res = foreach cg generate
>>     group as dno,
>>     SUM(e1.sal) as tot1,
>>     SUM(e2.sal) as tot2,
>>     COUNT(e1) as cnt1,
>>     COUNT(e2) as cnt2,
>>     MAX(e1.sal) as max1,
>>     MAX(e2.sal) as max2;
grunt> describe res;
res: {dno: int,tot1: long,tot2: long,cnt1:
long,cnt2: long,max1: int,max2: int}
grunt> dump res
(11,90000,90000,2,1,50000,90000)
(12,150000,130000,4,2,50000,80000)
(13,170000,100000,2,1,90000,100000)
(14,,60000,0,2,,30000)


------------------------------
   Entire Column aggregations using CoGroup.

 s1 = foreach emp1 generate sal;
 s2 = foreach emp2 generate sal;
 g = cogroup s1  all, s2 all ;
 r = foreach g generate
      SUM(s1.sal) as tot1,
       SUM(s2.sal) as tot2;
 r = foreach r generate * , tot1+tot2 as tot;
dump r
(410000,380000,790000)
     








































   
















Wednesday 3 May 2017

Spark : Conditional Transformations


 Conditions Transformations:

val trans = emp.map{ x =>
       val w = x.split(",");
       val sal = w(2).toInt
     
      val grade = if(sal>=70000) "A" else
                    if(sal>=50000) "B" else
                     if(sal>=30000) "C" else "D"
      val tax = sal*10/100
     
      val dno = w(4).toInt
      val dname = dno match{
                  case 11 => "Marketing"
                  case 12 => "Hr"
                  case 13 => "Finance"
                  case _  => "Other"
           }
      var sex = w(3).toLowerCase
      sex = if(sex=="m") "Male" else "Female"

     val res = Array(w(0), w(1),
                   w(2),tax.toString, grade, sex, dname).mkString(",")
      res
  }

 trans.saveAsTextFile("/user/cloudera/spLab/results4")
-----------------------------------------
   


               

Spark : Handling CSV files .. Removing Headers

scala> val l = List(10,20,30,40,50,56,67)

scala> val r2 = r.collect.reverse.take(3)
r2: Array[Int] = Array(67, 56, 50)

scala> val r2 = sc.parallelize(r.collect.reverse.take(3))
r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:31


-------------------------------
hadling CSV files [ first is header ]

[cloudera@quickstart ~]$ gedit prods
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal prods spLab

scala> val raw  = sc.textFile("/user/cloudera/spLab/prods")
raw: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/prods MapPartitionsRDD[11] at textFile at <console>:27

scala> raw.collect.foreach(println)
"pid","name","price"
p1,Tv,50000
p2,Lap,70000
p3,Ipod,8000
p4,Mobile,9000

scala> raw.count
res18: Long = 5



to eleminate first element, slice is used .

 scala> l
res19: List[Int] = List(10, 20, 30, 40, 50, 50, 56, 67)

scala> l.slice(2,5)
res20: List[Int] = List(30, 40, 50)

scala> l.slice(1,l.size)
res21: List[Int] = List(20, 30, 40, 50, 50, 56, 67)

way1:

scala> raw.collect
res29: Array[String] = Array("pid","name","price", p1,Tv,50000, p2,Lap,70000, p3,Ipod,8000, p4,Mobile,9000)

scala> val data = sc.parallelize(raw.collect.slice(1,raw.collect.size))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:29

scala> data.collect.foreach(println)
p1,Tv,50000
p2,Lap,70000
p3,Ipod,8000
p4,Mobile,9000

scala>

 here slice is not available with rdd.
 so  , data to be collected into local , then  slice has to applied.
 if rdd volume is bigger, client can not collect it. flow will be failed.

Way2:
------

 val data = raw.filter(x =>
     !line.contains("pid"))

 data.persist
--adv: no need to collect data into client[local]

--disadv : to eleminate 1 row, scanning all rows.

-----------------------------------------