Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Thursday, 8 June 2017

Hive Partitioned tables [case study]


[cloudera@quickstart ~]$ cat saleshistory
01/01/2011,2000
01/01/2011,3000
01/02/2011,5000
01/02/2011,4000
01/02/2011,1000
01/03/2011,2000
01/25/2011,3000
01/25/2011,5000
01/29/2011,4000
01/29/2011,1000
02/01/2011,2000
02/01/2011,3000
02/02/2011,8000
03/02/2011,9000
03/02/2011,3000
03/03/2011,5000
03/25/2011,7000
03/25/2011,2000
04/29/2011,5000
04/29/2011,3000
05/01/2011,2000
05/01/2011,3000
05/02/2011,5000
05/02/2011,4000
06/02/2011,1000
06/03/2011,2000
06/25/2011,3000
07/25/2011,5000
07/29/2011,4000
07/29/2011,1000
08/01/2011,2000
08/01/2011,3000
08/02/2011,5000
09/02/2011,4000
09/02/2011,1000
09/03/2011,2000
09/25/2011,3000
10/25/2011,5000
10/29/2011,4000
10/29/2011,1000
10/29/2011,5000
11/01/2011,2000
11/01/2011,3000
11/02/2011,5000
11/02/2011,4000
11/02/2011,1000
11/03/2011,2000
11/25/2011,3000
12/25/2011,5000
12/29/2011,4000
12/29/2011,1000
12/30/2011,9000
12/30/2011,40000
[cloudera@quickstart ~]$

create table myraw(dt string, amt int)
      row format delimited
        fields terminated by ',';

load data local inpath 'saleshistory' into table myraw;

create table urraw like myraw;




-----------------

insert overwrite table urraw
   select * from (
      select dt, amt from myraw
            union all
   select concat(substr(dt,1,9),'2') as dt, amt+1000 as amt
          from myraw
            union all
   select concat(substr(dt,1,9),'3') as dt, amt+4000 as amt
          from myraw
            union all
   select concat(substr(dt,1,9),'4') as dt, amt+500 as amt
          from myraw
            union all
   select concat(substr(dt,1,9),'5') as dt, amt+8000 as amt
          from myraw
            union all
   select concat(substr(dt,1,9),'6') as dt, amt+1000 as amt
          from myraw ) s;

-- 01/01/2011     1000


create table ourraw(dt array<string> , amt int);

insert overwrite table ourraw
  select split(dt,'/') , amt  from urraw;

create table sales(dt string, amt int);

insert overwrite table sales
  select concat(dt[2],'-',dt[0],'-',dt[1]), amt from
        ourraw;




create table salespart(dt string, amt int)
   partitioned by (y int, m int, d int)
 row format delimited
   fields terminated by ',';

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.dynamic.partitions.pernode=10000;






insert overwrite table salespart
   partition (y, m, d)
  select dt, amt, year(dt), month(dt), day(dt)
        from sales;

     ------    sankara.deva2016@gmail.com
   -----sub:   partitions case study.

Wednesday, 17 May 2017

Pig : Udfs using Python

we can keep multiple functions
  under one program(.py)

 transoform.py
-------------------------
from pig_util  import outputSchema
@outputSchema(name:Chararray)
def  firstUpper(x):
   fc = x[0].upper()
   rc = x[1:].lower()
   n = fc+rc
   return n
@outputSchema(sex:Chararray)
def  gender(x):
   if x=='m':
      x = 'Male'
   else:
      x = 'Female'
   return x

@outputSchema(dname:chararray)
def dept(dno):
   dname="Others"
   if dno==11:
       dname = 'Marketing'
   elif dno==12:
       dname="Hr"
   elif dno==13:
      dname="Finance"
   return dname

-----------------
register  'transform.py' using jython
  as myprog

res = foreach emp generate
     id, myprog.firstUpper(name) as name,
     sal , myprog.gender(sex) as sex,
       myprog.dept(dno) as dname;
-------------------------------------


pig unions :


 data1 = load 'file1' using PigStorage(',')
   as (name:chararray, city:chararray);

 data2 = load 'file2' using PigStorage(',')
   as (name:chararray, sal:int);

d1 = foreach data1 generate
        name, city, null as sal ;
d2 = foreach data2 generate
        name, null as city, sal;

d = union d1, d2;




Tuesday, 16 May 2017

Python Examples 1



name = input("Enter name ")
age = input("Enter age")

print(name, " is ", age, " years old ")
-----------------------------------

# if

a = 10
b = 25
if a>b:
  print(a , " is big")
else:
   print(b , " is big ")
-----------------------------
# nested if
a = 10
b = 20
c = 17
big = 0
if a>b:
  if a>c:
    big=a
  else:
    big=c
elif b>c:
  big=b
else:
  big=c
print("Biggest is ", big)
----------------------------------

# if and loop combination:

lst = [10,20,34,23,12,34,23,45]

big = lst[0]

for v in lst:
  if v>big:
     big=v
print("max of the list ", big)

-------------------------

lst = [10,20,34,23,12,34,23,45]

tot = 0
cnt = 0

for v in lst:
   tot += v
   cnt += 1
avg = tot/cnt
print("avgerage : ", avg)
-------------------------

lst = [10,20,34,23,12,34,23,45]

tot = sum(lst)
cnt  = len(lst)
avg = tot/cnt
max = max(lst)
min = min(lst)
print("tot : ", tot)
print("count : ",cnt)
print("average ", avg)
print("maximum ", max )
print("Minimum ", min)
----------------------------

Accessing List Elements:



------------------------------

updating List Elements:


lst[3] = 100
print(lst)
lst[4:7] = [250]*3
print(lst)

a = [1]*10
print(a)
--------------------------

adding elements to list:

lst = [10,20,34,23,12]

lst.append(100)
lst.append(200)
print(lst)

-----------------------------

merging lists:

lst = [10,20,34,23,12]

lst2 = [100,200,300,400,500]

l = lst + lst2

lst += lst2
print(l)
print(lst)

---------------------------

Transformations:

names = ["ravi", "rani", "vani", "venu", "siri"]

names2 = []

for x in names:
    names2.append(x.upper())
print(names2)
------------------------------

Transformations:
  first char in to upper case,
  and all remainings into lower case.

names = ["ravi", "rani", "vani", "venu", "siri"]

names2 = []

for x in names:
     fc = x[0].upper()
     rc = x[1:].lower()
     name=fc+rc
     names2.append(name)
print(names2)
----------------------------------

     sankara.deva2016@gmail.com
-------------------------------------














 
   





Spark : Spark streaming and Kafka Integration

steps:

 1)  start zookeper server
 2)  Start Kafka brokers [ one or more ]
 3)  create topic .
 4)  start console producer [ to write messages into topic ]
 5) start console consumer [ to test , whether messages are stremed ]
 6) create spark streaming context,
    which streams from kafka topic.
 7) perform transformations or aggregations
 8) output operation : which will direct the results into another kafka topic.
------------------------------------------





   

 


following code tested with ,
  spark 1.6.0 and kafka 0.10.2.0

kafka and spark streaming

bin/zookeeper-server-start.sh  config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(sc, Seconds(5))
import org.apache.spark.streaming.kafka.KafkaUtils
//1.
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
val lines = kafkaStream.map(x => x._2.toUpperCase)

val warr = lines.map(x => x.split(" "))
val pair = warr.map(x => (x,1))
val wc = pair.reduceByKey(_+_)

wc.print()
// use below code to write results into kafka topic
ssc.start

------------------------------
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic results1

// writing into kafka topic.

import org.apache.kafka.clients.producer.ProducerConfig
import java.util.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord


wc.foreachRDD(rdd =>
      rdd.foreachPartition(partition =>

                partition.foreach{
                  case t:(w:String,cnt:Long)=>{
                    val x = w+"\t"+cnt                
                    val props = new HashMap[String, Object]()
                    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                            "localhost:9092")
                    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
                    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")

                    println(x)
                    val producer = new KafkaProducer[String,String](props)
                    val message=new ProducerRecord[String, String]("results1",null,x)
                    producer.send(message)
                  }
                }))

-- execute above code before ssc.start.
--------------------------------------------
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic results1 --from-beginning





-------------------
 val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))

1. --? KafkaUtils.createStream()..
   needs 4 arguments.
    1st --->  streaming Context
    2nd --> zk details.
   3rd --- > consumer group id
   4th ----> Topics.
 
spark streaming can read from multiple topics.
    topic should be as  a key value pair of map object

key ---> topic name
value ---> no.of consumer threads.

 to read from multiple topics,
 the 4th argument should be as follows.
    Map("t1"->2,"t2"->4,"t3"->1)

-------------------------

   each given number of consumer threads will applied on each partition of kafka topic.

   ex: topic has 3 threads,
        consumber threads are 5.
   so , total number of threads = 15.

but these  15 theads are not parallely executed.

at shot, 5 threads for one partiton will be parallely consuming data.

to make all (15) parallel.

val numparts = 3
val kstreams = (1 to numparts).map{x =>
    val kafkaStream = KafkaUtils.createStream(ssc,   "localhost:2181","spark-streaming-consumer-   group",     Map("spark-topic" -> 5))
          }


















Pig : UDFs


Pig UDFS
----------

  UDF ---> user defined functions.
 
   adv:
       i)  custom functionalities.
      ii)  reusability.

 Pig UDFs can be developed by
    java
   python
    ruby
    c++
    javascript
    perl

step1:
   Develop udf code.

step2:
   export into jar file
   ex: /home/cloudera/Desktop/pigs.jar

step3:
   register jar file into pig.
 grunt> register Desktop/pigs.jar

step4:
   create temporory function  for udf class.

 grunt> define  ucase pig.analytics.ConvertUpper();

step5:
  calling the function:

 grunt>e =  foreach emp generate
      id, ucase(name) as name, sal,
        ucase(sex) as sex, dno;

 
package  pig.analytics;
import .....

--> ucase(name) ---> upper conversion

public class ConvertUpper extends EvalFunc<String>
  {
     public String exec(Tuple v)
      throws IOException
     {
        String str = (String)v.get(0);
        String res = str.toUpperCase();
        retrun res;
           
     }

 
 }
--------------------------
$ cat > samp
100,230,400
123,100,90
140,560,430

$ hadoop fs -copyFromLocal samp piglab

grunt> s = load 'piglab/samp'
            using PigStorage(',')
          as (a:int, b:int, c:int);



package pig.analytics;
....
public class RMax extends EvalFunc<Integer>
{
    public  Integer exec(Tuple v)
     throws IOException
    {
      int a =(Integer) v.get(0);
      int b =(Integer) v.get(1);
      int c =(Integer) v.get(2);

      int big = a; // 10,20,3
      if (a>big) big = a;
      if (b>big) big = b;
      if (c>big) big = c;
      return  big;
    }
 }

export into jar : Desktop/pigs.jar

grunt> register Desktop/pigs.jar;

grunt> define rmax pig.analytics.RMax();

grunt> res = foreach s generate *,
                   rmax(*) as max;

--------------------------------

 package pig.analytics;
 .......
 public class RowMax
   extends EvalFunc<Integer>
 {
    public Integer exec(Tuple v) throws IOException
    {
     List<Object> lobs = v.getAll() ;
     int max = 0;
     int cnt =0;
    // -20,-3,-40
     for(Object o : lobs)
     {
       cnt++;
       int val = (Integer)o;
       if(cnt==1) max = val;
       max = Math.max(max, val);
     }
     return max;
    }
 }

export in to jar : Desktop/pigs.jar
grunt> register Desktop/pigs.jar
grunt> define dynmax pig.analytics.RowMax();
grunt> r = foreach s generate *, dynmax(*) as m;
-----------------------------------------

emp = load 'piglab/emp' using PigStorage(',')
   as (id:int, name:chararray, sal:int,
     sex:chararray, dno:int);

grade()
dname()
gender()


package pig.analytics;
public class Gender extends EvalFunc<String>
{
 public String exec(Tuple v) throws IOException
 {
     String s =(String) v.get(0);
     s = s.toUpperCase();
     if (s.matches("F"))
       s = "Female";
     else
       s = "Male";
     return s;
 }
}
-----------------

package pig.analytics;
public class Grade extends EvalFunc<String>
{
 public String exec(Tuple v) throws IOException
 {
     String sal =(Integer) v.get(0);
     String grade;
     if (sal>=70000)
       grade="A";
     else if (sal>=50000)
          grade="B";
         else if (sal>=30000)
               grade="C";
              else
               grade="D";
     return grade;
 }
}
------
package pig.analytics;
public class DeptName extends EvalFunc<String>
{
 public String exec(Tuple v) throws IOException
 {
    int dno = (Integer)v.get(0);
    String dname;
    switch (dno){
    case 11 :
          dname = "Marketing";
          break;
    case 12 :
          dname = "HR";
          break;
    case 13 :
          dname = "Finance";
          break;
    default:
          dname = "Others";
     }
    return dname;  
 }
}
---------------------------------

---------------------------
export into jar : Desktop/pigs.jar;
grunt> register Desktop/pigs.jar;
grunt> define gender pig.analytics.Gender();
grunt> define grade pig.analytics.Grade();
grunt> define dept pig.analytics.DeptName();

grunt> res = foreach emp generate
    id, ucase(name) as name,
     sal, grade(sal) as grade,
    gender(sex) as sex,
    dept(dno) as dname ;
---------------------------------











     
         







Pig : Cross Operator to Cartisian


 Cross:
 -----
   used cartisian product.

   each element of left set, joins with each element of right set.


  ds1 --> (a)
          (b)
          (c)

  ds2 --> (1)
          (2)

  x = cross ds1, ds3

   (a,1)
   (a,2)
   (b,1)
   (b,2)
   (c,1)
   (c,2)


emp = load 'piglab/emp' using PigStorage(',')
    as (id:int, name:chararray, sal:int,
  sex:chararray, dno:int);


task:
   find how many employees are below to the avg sal,
  and above to the avg sal;

sals = foreach emp generate sal;

grp = group sals all;

avgsal = foreach grp generate AVG(sals.sal) as avg;
avgsal = foreach avgsal generate (int)avg;


e = cross sals, avgsal;
e = foreach e generate sals::sal as sal, avgsal::avg as avg;


stats = foreach e generate
       (sal>=avg ? 'Above':'Below') as stat;
grp2 = group stats by stat;
res = foreach grp2 generate group as stat,
     COUNT(stats) as cnt;
dump res
(Above,2)
(Below,6)


--in above task, cross is used to , make available of avgsal to each row of the emplyee,
  so that we can compare each sal with avg.

--------------------------------------

2nd example:


[cloudera@quickstart ~]$ cat sales
01/01/2016,40000
01/03/2016,50000
01/25/2016,50000
02/01/2016,40000
02/03/2016,90000
02/25/2016,50000
03/01/2016,40000
03/03/2016,50000
04/25/2016,50000
05/01/2016,40000
05/03/2016,50000
06/25/2016,50000
06/01/2016,40000
06/03/2016,90000
06/25/2016,50000
07/01/2016,40000
07/03/2016,50000
07/25/2016,50000
08/01/2016,40000
09/03/2016,50000
09/25/2016,50000
10/01/2016,40000
10/03/2016,90000
10/25/2016,50000
10/01/2016,40000
11/03/2016,50000
12/25/2016,50000
12/01/2016,40000
12/03/2016,50000
12/25/2016,50000
12/01/2016,40000
12/03/2016,90000
12/25/2016,50000
12/01/2016,40000
12/03/2016,50000
12/25/2016,50000
[cloudera@quickstart ~]$

[cloudera@quickstart ~]$ hadoop fs -copyFromLocal sales piglab

monthly sales report.

sales = load 'piglab/sales'
   using PigStorage(',')
    as (dt:chararray, amt:int);

sales2 = foreach sales generate
     SUBSTRING(dt,0,2) as mon, amt;
s = foreach sales2 generate (int)mon, amt;

grp = group s by mon;
mrep = foreach grp generate group as mon,
              SUM(s.amt) as tot;

--quarterly sales report:

q = foreach mrep generate
       (mon <4 ? 1:
         (mon <7 ? 2:
           (mon <10 ? 3:4))) as qtr, tot;
qgrp = group q by qtr;
qrep = foreach qgrp generate
         group as qtr, SUM(q.tot) as tot;
dump qrep
(1,410000)
(2,370000)
(3,280000)
(4,780000)


qrep2 = foreach qrep generate *;

cr = cross qrep, qrep2;
cr = foreach cr generate
      qrep::qtr as q1, qrep2::qtr as q2,
      qrep::tot as tot1, qrep2::tot as tot2;

cr2 = filter cr by (q1-q2)==1;

rep = foreach cr2 generate *,
       ((tot1-tot2)*100)/tot2 as pgrowth;

dump rep;
--------------------------------

[cloudera@quickstart ~]$ cat matri
101,Amar,25,40000,hyd,m
102,Amala,23,50000,Del,f
103,Kiran,29,50000,hyd,m
104,Samantha,26,30000,hyd,f
105,Mani,30,70000,Del,m
106,Rakhul,24,40000,Del,f
107,Venu,34,100000,Pune,m
108,Ileana,29,200000,hyd,f

[cloudera@quickstart ~]$

hadoop fs -copyFromLocal matri piglab

applicants = load 'piglab/matri'
    using PigStorage(',')
    as (id:int, name:chararray,
       age:int, income:int, city:chararray,
        sex:chararray);

males = filter applicants by sex=='m';
fems = filter applicants by sex=='f';

mf = cross males, fems;

mf = foreach mf generate males::name as mname,
           fems::name as fname,
         males::age as mage,
         fems::age as fage;

res1 = filter mf by (mage>fage and mage-fage<4);

mlist = foreach res1 generate
           mname as src , fname as trg,
           mage as srcage, fage as trgage;
flist = foreach res1 generate
          fname as src, mname as trg,
           fage as srcage, mage as trgage;

list = union mlist, flist;




dump res1;









   





     

















Pig : Joins

[cloudera@quickstart ~]$ hadoop fs -cat spLab/e
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
109,jj,10000,m,14
110,kkk,20000,f,15
111,dddd,30000,m,15
[cloudera@quickstart ~]$ hadoop fs -cat spLab/d
11,marketing,hyd
12,hr,del
13,fin,del
21,admin,hyd
22,production,del
[cloudera@quickstart ~]$
$ cat > joins.pig

 emp = load 'spLab/e' using PigStorage(',')
    as (id:int, name:chararray, sal:int,
       sex:chararray, dno:int);

 dept = load 'spLab/d' using PigStorage(',')
   as (dno:int, dname:chararray, dloc:chararray);

 ij = join  emp by dno, dept by dno;

 lj = join emp by dno left outer, dept by dno;

 rj = join emp by dno right outer, dept by dno;

 fj = join emp by dno full outer, dept by dno;


cntr+d [to save]

grunt> run joins.pig
runt> describe ij
ij: {emp::id: int,emp::name: chararray,emp::sal: int,emp::sex: chararray,emp::dno: int,dept::dno: int,dept::dname: chararray,dept::dloc: chararray}
grunt> describe fj
fj: {emp::id: int,emp::name: chararray,emp::sal: int,emp::sex: chararray,emp::dno: int,dept::dno: int,dept::dname: chararray,dept::dloc: chararray}
grunt> ed = foreach fj generate
>>    emp::id as id, emp::name as name, emp::sal as sal,
>>     emp::sex as sex, dept::dname as dname, dept::dloc as city;
-- neutralizing schema. omitting reference operators.

grunt> dump ed
(108,iiii,50000,m,marketing,hyd)
(101,aaaa,40000,m,marketing,hyd)
(106,dkd,40000,m,hr,del)
(105,ee,10000,m,hr,del)
(103,cccc,50000,m,hr,del)
(102,bbbbbb,50000,f,hr,del)
(107,sdkfj,80000,f,fin,del)
(104,dd,90000,f,fin,del)
(109,jj,10000,m,,)
(111,dddd,30000,m,,)
(110,kkk,20000,f,,)
(,,,,admin,hyd)
(,,,,production,del)

-----------------------------

grunt> ed1 = foreach fj generate
>>       emp::dno as dno1, dept::dno as dno2, emp::sal as sal;
grunt> describe ed1
ed1: {dno1: int,dno2: int,sal: int}
grunt> stats = foreach ed1 generate
>>   (dno1 is not null and dno2 is not null ? 'Working' :
>>     (dno2 is null ? 'BenchTeam' : 'BenchProject')) as stat, sal;
grunt> grp = group stats by stat;
grunt> res = foreach grp generate
>>     group as stat, SUM(stats.sal) as tot;
grunt> r = foreach res generate stat,
>>             (tot is null ? 0:tot) as tot;

grunt> dump r;
(Working,410000)
(BenchTeam,60000)
(BenchProject,0)


-----------------------------------------

to get top 3 salaries.


 sals = foreach emp generate sal;
 sals2 = distinct sals;
 sals3 = order sals2 by sal desc;
 sals4 = limit sals3 3;

 e = join emp by sal, sals4 by sal;
 etop3 = foreach e generate
     emp::id as id, emp::name as name,
        emp::sal as sal;
 dump etop3;

-------------------------------

 emp ---> id name sal sex dno
 dept --> dno dname dloc mid
 mngrs ---> mid  mname  phone


  what is total salary budjet for each manager.


[cloudera@quickstart ~]$ hadoop fs -copyFromLocal dept spLab/dd
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal mngrs spLab
[cloudera@quickstart ~]$

dep = load 'spLab/dd' using PigStorage(',')
   as (dno:int, dname:chararray, dloc:chararray,
  mid:chararray);

mgr = load 'spLab/mngrs' using PigStorage(',')
   as (mid:chararray, mname:chararray,
       ph:chararray);

edep = join emp by dno, dep by dno;

edep2 = foreach edep generate
         emp::sal as sal, dep::mid as mid;

edm = join edep2 by mid, mgr by mid;
edm2 = foreach edm generate
        mgr::mname as mname ,
          edep2::sal as sal;

grpx = group edm2 by mname;
resx = foreach grpx generate
       group as mname,
          SUM(edm2.sal) as tot;
dump resx
------------------------------------