Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Wednesday, 17 May 2017

Pig : Udfs using Python

we can keep multiple functions
  under one program(.py)

 transoform.py
-------------------------
from pig_util  import outputSchema
@outputSchema(name:Chararray)
def  firstUpper(x):
   fc = x[0].upper()
   rc = x[1:].lower()
   n = fc+rc
   return n
@outputSchema(sex:Chararray)
def  gender(x):
   if x=='m':
      x = 'Male'
   else:
      x = 'Female'
   return x

@outputSchema(dname:chararray)
def dept(dno):
   dname="Others"
   if dno==11:
       dname = 'Marketing'
   elif dno==12:
       dname="Hr"
   elif dno==13:
      dname="Finance"
   return dname

-----------------
register  'transform.py' using jython
  as myprog

res = foreach emp generate
     id, myprog.firstUpper(name) as name,
     sal , myprog.gender(sex) as sex,
       myprog.dept(dno) as dname;
-------------------------------------


pig unions :


 data1 = load 'file1' using PigStorage(',')
   as (name:chararray, city:chararray);

 data2 = load 'file2' using PigStorage(',')
   as (name:chararray, sal:int);

d1 = foreach data1 generate
        name, city, null as sal ;
d2 = foreach data2 generate
        name, null as city, sal;

d = union d1, d2;




Tuesday, 16 May 2017

Python Examples 1



name = input("Enter name ")
age = input("Enter age")

print(name, " is ", age, " years old ")
-----------------------------------

# if

a = 10
b = 25
if a>b:
  print(a , " is big")
else:
   print(b , " is big ")
-----------------------------
# nested if
a = 10
b = 20
c = 17
big = 0
if a>b:
  if a>c:
    big=a
  else:
    big=c
elif b>c:
  big=b
else:
  big=c
print("Biggest is ", big)
----------------------------------

# if and loop combination:

lst = [10,20,34,23,12,34,23,45]

big = lst[0]

for v in lst:
  if v>big:
     big=v
print("max of the list ", big)

-------------------------

lst = [10,20,34,23,12,34,23,45]

tot = 0
cnt = 0

for v in lst:
   tot += v
   cnt += 1
avg = tot/cnt
print("avgerage : ", avg)
-------------------------

lst = [10,20,34,23,12,34,23,45]

tot = sum(lst)
cnt  = len(lst)
avg = tot/cnt
max = max(lst)
min = min(lst)
print("tot : ", tot)
print("count : ",cnt)
print("average ", avg)
print("maximum ", max )
print("Minimum ", min)
----------------------------

Accessing List Elements:



------------------------------

updating List Elements:


lst[3] = 100
print(lst)
lst[4:7] = [250]*3
print(lst)

a = [1]*10
print(a)
--------------------------

adding elements to list:

lst = [10,20,34,23,12]

lst.append(100)
lst.append(200)
print(lst)

-----------------------------

merging lists:

lst = [10,20,34,23,12]

lst2 = [100,200,300,400,500]

l = lst + lst2

lst += lst2
print(l)
print(lst)

---------------------------

Transformations:

names = ["ravi", "rani", "vani", "venu", "siri"]

names2 = []

for x in names:
    names2.append(x.upper())
print(names2)
------------------------------

Transformations:
  first char in to upper case,
  and all remainings into lower case.

names = ["ravi", "rani", "vani", "venu", "siri"]

names2 = []

for x in names:
     fc = x[0].upper()
     rc = x[1:].lower()
     name=fc+rc
     names2.append(name)
print(names2)
----------------------------------

     sankara.deva2016@gmail.com
-------------------------------------














 
   





Spark : Spark streaming and Kafka Integration

steps:

 1)  start zookeper server
 2)  Start Kafka brokers [ one or more ]
 3)  create topic .
 4)  start console producer [ to write messages into topic ]
 5) start console consumer [ to test , whether messages are stremed ]
 6) create spark streaming context,
    which streams from kafka topic.
 7) perform transformations or aggregations
 8) output operation : which will direct the results into another kafka topic.
------------------------------------------





   

 


following code tested with ,
  spark 1.6.0 and kafka 0.10.2.0

kafka and spark streaming

bin/zookeeper-server-start.sh  config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(sc, Seconds(5))
import org.apache.spark.streaming.kafka.KafkaUtils
//1.
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
val lines = kafkaStream.map(x => x._2.toUpperCase)

val warr = lines.map(x => x.split(" "))
val pair = warr.map(x => (x,1))
val wc = pair.reduceByKey(_+_)

wc.print()
// use below code to write results into kafka topic
ssc.start

------------------------------
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic results1

// writing into kafka topic.

import org.apache.kafka.clients.producer.ProducerConfig
import java.util.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord


wc.foreachRDD(rdd =>
      rdd.foreachPartition(partition =>

                partition.foreach{
                  case t:(w:String,cnt:Long)=>{
                    val x = w+"\t"+cnt                
                    val props = new HashMap[String, Object]()
                    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                            "localhost:9092")
                    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
                    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")

                    println(x)
                    val producer = new KafkaProducer[String,String](props)
                    val message=new ProducerRecord[String, String]("results1",null,x)
                    producer.send(message)
                  }
                }))

-- execute above code before ssc.start.
--------------------------------------------
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic results1 --from-beginning





-------------------
 val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))

1. --? KafkaUtils.createStream()..
   needs 4 arguments.
    1st --->  streaming Context
    2nd --> zk details.
   3rd --- > consumer group id
   4th ----> Topics.
 
spark streaming can read from multiple topics.
    topic should be as  a key value pair of map object

key ---> topic name
value ---> no.of consumer threads.

 to read from multiple topics,
 the 4th argument should be as follows.
    Map("t1"->2,"t2"->4,"t3"->1)

-------------------------

   each given number of consumer threads will applied on each partition of kafka topic.

   ex: topic has 3 threads,
        consumber threads are 5.
   so , total number of threads = 15.

but these  15 theads are not parallely executed.

at shot, 5 threads for one partiton will be parallely consuming data.

to make all (15) parallel.

val numparts = 3
val kstreams = (1 to numparts).map{x =>
    val kafkaStream = KafkaUtils.createStream(ssc,   "localhost:2181","spark-streaming-consumer-   group",     Map("spark-topic" -> 5))
          }


















Pig : UDFs


Pig UDFS
----------

  UDF ---> user defined functions.
 
   adv:
       i)  custom functionalities.
      ii)  reusability.

 Pig UDFs can be developed by
    java
   python
    ruby
    c++
    javascript
    perl

step1:
   Develop udf code.

step2:
   export into jar file
   ex: /home/cloudera/Desktop/pigs.jar

step3:
   register jar file into pig.
 grunt> register Desktop/pigs.jar

step4:
   create temporory function  for udf class.

 grunt> define  ucase pig.analytics.ConvertUpper();

step5:
  calling the function:

 grunt>e =  foreach emp generate
      id, ucase(name) as name, sal,
        ucase(sex) as sex, dno;

 
package  pig.analytics;
import .....

--> ucase(name) ---> upper conversion

public class ConvertUpper extends EvalFunc<String>
  {
     public String exec(Tuple v)
      throws IOException
     {
        String str = (String)v.get(0);
        String res = str.toUpperCase();
        retrun res;
           
     }

 
 }
--------------------------
$ cat > samp
100,230,400
123,100,90
140,560,430

$ hadoop fs -copyFromLocal samp piglab

grunt> s = load 'piglab/samp'
            using PigStorage(',')
          as (a:int, b:int, c:int);



package pig.analytics;
....
public class RMax extends EvalFunc<Integer>
{
    public  Integer exec(Tuple v)
     throws IOException
    {
      int a =(Integer) v.get(0);
      int b =(Integer) v.get(1);
      int c =(Integer) v.get(2);

      int big = a; // 10,20,3
      if (a>big) big = a;
      if (b>big) big = b;
      if (c>big) big = c;
      return  big;
    }
 }

export into jar : Desktop/pigs.jar

grunt> register Desktop/pigs.jar;

grunt> define rmax pig.analytics.RMax();

grunt> res = foreach s generate *,
                   rmax(*) as max;

--------------------------------

 package pig.analytics;
 .......
 public class RowMax
   extends EvalFunc<Integer>
 {
    public Integer exec(Tuple v) throws IOException
    {
     List<Object> lobs = v.getAll() ;
     int max = 0;
     int cnt =0;
    // -20,-3,-40
     for(Object o : lobs)
     {
       cnt++;
       int val = (Integer)o;
       if(cnt==1) max = val;
       max = Math.max(max, val);
     }
     return max;
    }
 }

export in to jar : Desktop/pigs.jar
grunt> register Desktop/pigs.jar
grunt> define dynmax pig.analytics.RowMax();
grunt> r = foreach s generate *, dynmax(*) as m;
-----------------------------------------

emp = load 'piglab/emp' using PigStorage(',')
   as (id:int, name:chararray, sal:int,
     sex:chararray, dno:int);

grade()
dname()
gender()


package pig.analytics;
public class Gender extends EvalFunc<String>
{
 public String exec(Tuple v) throws IOException
 {
     String s =(String) v.get(0);
     s = s.toUpperCase();
     if (s.matches("F"))
       s = "Female";
     else
       s = "Male";
     return s;
 }
}
-----------------

package pig.analytics;
public class Grade extends EvalFunc<String>
{
 public String exec(Tuple v) throws IOException
 {
     String sal =(Integer) v.get(0);
     String grade;
     if (sal>=70000)
       grade="A";
     else if (sal>=50000)
          grade="B";
         else if (sal>=30000)
               grade="C";
              else
               grade="D";
     return grade;
 }
}
------
package pig.analytics;
public class DeptName extends EvalFunc<String>
{
 public String exec(Tuple v) throws IOException
 {
    int dno = (Integer)v.get(0);
    String dname;
    switch (dno){
    case 11 :
          dname = "Marketing";
          break;
    case 12 :
          dname = "HR";
          break;
    case 13 :
          dname = "Finance";
          break;
    default:
          dname = "Others";
     }
    return dname;  
 }
}
---------------------------------

---------------------------
export into jar : Desktop/pigs.jar;
grunt> register Desktop/pigs.jar;
grunt> define gender pig.analytics.Gender();
grunt> define grade pig.analytics.Grade();
grunt> define dept pig.analytics.DeptName();

grunt> res = foreach emp generate
    id, ucase(name) as name,
     sal, grade(sal) as grade,
    gender(sex) as sex,
    dept(dno) as dname ;
---------------------------------











     
         







Pig : Cross Operator to Cartisian


 Cross:
 -----
   used cartisian product.

   each element of left set, joins with each element of right set.


  ds1 --> (a)
          (b)
          (c)

  ds2 --> (1)
          (2)

  x = cross ds1, ds3

   (a,1)
   (a,2)
   (b,1)
   (b,2)
   (c,1)
   (c,2)


emp = load 'piglab/emp' using PigStorage(',')
    as (id:int, name:chararray, sal:int,
  sex:chararray, dno:int);


task:
   find how many employees are below to the avg sal,
  and above to the avg sal;

sals = foreach emp generate sal;

grp = group sals all;

avgsal = foreach grp generate AVG(sals.sal) as avg;
avgsal = foreach avgsal generate (int)avg;


e = cross sals, avgsal;
e = foreach e generate sals::sal as sal, avgsal::avg as avg;


stats = foreach e generate
       (sal>=avg ? 'Above':'Below') as stat;
grp2 = group stats by stat;
res = foreach grp2 generate group as stat,
     COUNT(stats) as cnt;
dump res
(Above,2)
(Below,6)


--in above task, cross is used to , make available of avgsal to each row of the emplyee,
  so that we can compare each sal with avg.

--------------------------------------

2nd example:


[cloudera@quickstart ~]$ cat sales
01/01/2016,40000
01/03/2016,50000
01/25/2016,50000
02/01/2016,40000
02/03/2016,90000
02/25/2016,50000
03/01/2016,40000
03/03/2016,50000
04/25/2016,50000
05/01/2016,40000
05/03/2016,50000
06/25/2016,50000
06/01/2016,40000
06/03/2016,90000
06/25/2016,50000
07/01/2016,40000
07/03/2016,50000
07/25/2016,50000
08/01/2016,40000
09/03/2016,50000
09/25/2016,50000
10/01/2016,40000
10/03/2016,90000
10/25/2016,50000
10/01/2016,40000
11/03/2016,50000
12/25/2016,50000
12/01/2016,40000
12/03/2016,50000
12/25/2016,50000
12/01/2016,40000
12/03/2016,90000
12/25/2016,50000
12/01/2016,40000
12/03/2016,50000
12/25/2016,50000
[cloudera@quickstart ~]$

[cloudera@quickstart ~]$ hadoop fs -copyFromLocal sales piglab

monthly sales report.

sales = load 'piglab/sales'
   using PigStorage(',')
    as (dt:chararray, amt:int);

sales2 = foreach sales generate
     SUBSTRING(dt,0,2) as mon, amt;
s = foreach sales2 generate (int)mon, amt;

grp = group s by mon;
mrep = foreach grp generate group as mon,
              SUM(s.amt) as tot;

--quarterly sales report:

q = foreach mrep generate
       (mon <4 ? 1:
         (mon <7 ? 2:
           (mon <10 ? 3:4))) as qtr, tot;
qgrp = group q by qtr;
qrep = foreach qgrp generate
         group as qtr, SUM(q.tot) as tot;
dump qrep
(1,410000)
(2,370000)
(3,280000)
(4,780000)


qrep2 = foreach qrep generate *;

cr = cross qrep, qrep2;
cr = foreach cr generate
      qrep::qtr as q1, qrep2::qtr as q2,
      qrep::tot as tot1, qrep2::tot as tot2;

cr2 = filter cr by (q1-q2)==1;

rep = foreach cr2 generate *,
       ((tot1-tot2)*100)/tot2 as pgrowth;

dump rep;
--------------------------------

[cloudera@quickstart ~]$ cat matri
101,Amar,25,40000,hyd,m
102,Amala,23,50000,Del,f
103,Kiran,29,50000,hyd,m
104,Samantha,26,30000,hyd,f
105,Mani,30,70000,Del,m
106,Rakhul,24,40000,Del,f
107,Venu,34,100000,Pune,m
108,Ileana,29,200000,hyd,f

[cloudera@quickstart ~]$

hadoop fs -copyFromLocal matri piglab

applicants = load 'piglab/matri'
    using PigStorage(',')
    as (id:int, name:chararray,
       age:int, income:int, city:chararray,
        sex:chararray);

males = filter applicants by sex=='m';
fems = filter applicants by sex=='f';

mf = cross males, fems;

mf = foreach mf generate males::name as mname,
           fems::name as fname,
         males::age as mage,
         fems::age as fage;

res1 = filter mf by (mage>fage and mage-fage<4);

mlist = foreach res1 generate
           mname as src , fname as trg,
           mage as srcage, fage as trgage;
flist = foreach res1 generate
          fname as src, mname as trg,
           fage as srcage, mage as trgage;

list = union mlist, flist;




dump res1;









   





     

















Pig : Joins

[cloudera@quickstart ~]$ hadoop fs -cat spLab/e
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
109,jj,10000,m,14
110,kkk,20000,f,15
111,dddd,30000,m,15
[cloudera@quickstart ~]$ hadoop fs -cat spLab/d
11,marketing,hyd
12,hr,del
13,fin,del
21,admin,hyd
22,production,del
[cloudera@quickstart ~]$
$ cat > joins.pig

 emp = load 'spLab/e' using PigStorage(',')
    as (id:int, name:chararray, sal:int,
       sex:chararray, dno:int);

 dept = load 'spLab/d' using PigStorage(',')
   as (dno:int, dname:chararray, dloc:chararray);

 ij = join  emp by dno, dept by dno;

 lj = join emp by dno left outer, dept by dno;

 rj = join emp by dno right outer, dept by dno;

 fj = join emp by dno full outer, dept by dno;


cntr+d [to save]

grunt> run joins.pig
runt> describe ij
ij: {emp::id: int,emp::name: chararray,emp::sal: int,emp::sex: chararray,emp::dno: int,dept::dno: int,dept::dname: chararray,dept::dloc: chararray}
grunt> describe fj
fj: {emp::id: int,emp::name: chararray,emp::sal: int,emp::sex: chararray,emp::dno: int,dept::dno: int,dept::dname: chararray,dept::dloc: chararray}
grunt> ed = foreach fj generate
>>    emp::id as id, emp::name as name, emp::sal as sal,
>>     emp::sex as sex, dept::dname as dname, dept::dloc as city;
-- neutralizing schema. omitting reference operators.

grunt> dump ed
(108,iiii,50000,m,marketing,hyd)
(101,aaaa,40000,m,marketing,hyd)
(106,dkd,40000,m,hr,del)
(105,ee,10000,m,hr,del)
(103,cccc,50000,m,hr,del)
(102,bbbbbb,50000,f,hr,del)
(107,sdkfj,80000,f,fin,del)
(104,dd,90000,f,fin,del)
(109,jj,10000,m,,)
(111,dddd,30000,m,,)
(110,kkk,20000,f,,)
(,,,,admin,hyd)
(,,,,production,del)

-----------------------------

grunt> ed1 = foreach fj generate
>>       emp::dno as dno1, dept::dno as dno2, emp::sal as sal;
grunt> describe ed1
ed1: {dno1: int,dno2: int,sal: int}
grunt> stats = foreach ed1 generate
>>   (dno1 is not null and dno2 is not null ? 'Working' :
>>     (dno2 is null ? 'BenchTeam' : 'BenchProject')) as stat, sal;
grunt> grp = group stats by stat;
grunt> res = foreach grp generate
>>     group as stat, SUM(stats.sal) as tot;
grunt> r = foreach res generate stat,
>>             (tot is null ? 0:tot) as tot;

grunt> dump r;
(Working,410000)
(BenchTeam,60000)
(BenchProject,0)


-----------------------------------------

to get top 3 salaries.


 sals = foreach emp generate sal;
 sals2 = distinct sals;
 sals3 = order sals2 by sal desc;
 sals4 = limit sals3 3;

 e = join emp by sal, sals4 by sal;
 etop3 = foreach e generate
     emp::id as id, emp::name as name,
        emp::sal as sal;
 dump etop3;

-------------------------------

 emp ---> id name sal sex dno
 dept --> dno dname dloc mid
 mngrs ---> mid  mname  phone


  what is total salary budjet for each manager.


[cloudera@quickstart ~]$ hadoop fs -copyFromLocal dept spLab/dd
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal mngrs spLab
[cloudera@quickstart ~]$

dep = load 'spLab/dd' using PigStorage(',')
   as (dno:int, dname:chararray, dloc:chararray,
  mid:chararray);

mgr = load 'spLab/mngrs' using PigStorage(',')
   as (mid:chararray, mname:chararray,
       ph:chararray);

edep = join emp by dno, dep by dno;

edep2 = foreach edep generate
         emp::sal as sal, dep::mid as mid;

edm = join edep2 by mid, mgr by mid;
edm2 = foreach edm generate
        mgr::mname as mname ,
          edep2::sal as sal;

grpx = group edm2 by mname;
resx = foreach grpx generate
       group as mname,
          SUM(edm2.sal) as tot;
dump resx
------------------------------------






 








 




















Pig : Order [ Sorting ] , exec, run , pig


 order :-
   to sort data (tuples) in ascending or descending order.

 emp = load 'piglab/emp'
     using PigStorage(',')
     as (id:int, name:chararray,
    sal:int, sex:chararray, dno:int);

 e1 = order emp by name;
 e2 = order emp by sal desc;
 e3 = order emp by sal desc, sex, dno desc;

 ---------------------------------------
sql:
   select * from emp order by sal desc limit 3;

 e = order emp by sal desc;
 top3 = limit e 3;


limitation:

 101,aaa,30000,.....
 102,bbb,90000,....
 103,cccc,90000,....
 104,dd,90000,....
 105,ee,80000,.....

 above process is correct,
  if there are no duplicated salaries.





--------------------------
 sql:
   select dno, sum(sal) as tot from emp
     group by dno
     order by tot desc
     limit 3;


 e =  foreach emp generate dno, sal;
 grp = group e by dno;
 res = foreach grp generate
         group as dno, SUM(e.sal) as tot;
 sorted = order res by tot desc;
 top3 = limit sorted 3;

limitation:

 dump res
 11,2l
 12,3l
 13,3l
 14,3l
 15,3l
 16,0.5l

 above process is correct,
  if there are no , duplicated sums for each dno.
solution to above two problems.
 i)udf
 ii)joins.

-------------------------------------

 executing scripts in pig.
--------------------------
 3 commands :
 i)pig
 ii) exec
 iii) run

Pig:
  to execute script file from command prompt.
  alias of reltions are not available with
   grunt shell.

 [cloudera@quickstart ~]$ pig  script1.pig

exec:
  to execute script from grunt shell.
 still aliases of relations are not available.
 so we can not reuse them.

grunt> exec script1.pig

run:
 
 to execute script from grunt shell.
 aliases of relations are available in grunt.
 so that you can reuse them.

--------------------------------------

environment availability
                            of aliases in grunt

 pig command prompt no

 exec grunt no

 run grunt yes
-----------------------------------------

when to use what.
 
  to deploy scripts from shell script or oozie,
  use Pig.

  to execute from grunt, and not to override previous alias of existed flows.
   use "Exec".

 to execute from grunt, and to reuse them
  use "run".

---------------------------------