Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Tuesday 1 August 2017

Hive(10AmTo1:00Pm) Lab1 notes : Hive Inner and External Tables

hive> create table samp1(line string);
-- here we did not select any database.
   default database in hive is "default".
 
  the hdfs location of default database is
   /user/hive/warehouse

 -- when you create a table in default database, under warehouse location, one directory will be created with table name.

   in  hdfs,
   /user/hive/warehouse/samp1 directory is created.

hive> create database mydb;

  when a database is created, in warehouse location, with name database and extension ".db" , one directory will be created.

 How to select database:

hive> use mydb;

hive> create table test1(line string);

  under mydb.db directory, test1 table directory will be created.

 /user/hive/warehouse/mydb.db/test1.

[cloudera@quickstart ~]$ ls file*
file1  file2  file3
[cloudera@quickstart ~]$ cat file1
aaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
[cloudera@quickstart ~]$ cat file2
bbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbb
[cloudera@quickstart ~]$ cat file3
cccccccccccccccccccc
ccccccccccccccccccc
ccccccccccccc
[cloudera@quickstart ~]$

hive> use default;
hive> load data local inpath 'file1'
      into table samp1;
-- when you  load file into table,
    the file will be copied into table's backend directory.

   in hdfs,
  /user/hive/warehouse/samp1/file1

hive> load data local inpath 'file2'
   into table samp1;

  now table directory has two files,
     file1 and file2.
hive> select * from samp1;
o/p:
aaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbb


-- hive will read all rows of all files of
   table directory.

another way of loading file into table.

$ hadoop fs -copyFromLocal file3
          /user/hive/warehouse/samp1



hive> select * from samp1;
OK
aaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbb
cccccccccccccccccccc
ccccccccccccccccccc
ccccccccccccc

hive> use mydb;
hive> show tables;
test1
hive> load data local inpath 'file1' into table test1;
hive>
in hdfs,
 /user/hive/warehouse/mydb.db/test1/file1

===============================

  Hive tables are basically two types.

 1) Inner tables [user managed tables]
 2) External tables.

 when inner table is dropped,
   both metadata and data(from hdfs) will be deleted.

 when external table is dropped ,
   only metadata will be deleted,
   but still data is safely available in hdfs table's backend location.

  so that you can reuse data in future.


where hive tables metadata will be stored.
  -- in rdbms,
    under  metastore database.

 when you submit a query in hive,
   hive will contact metastore, and indentify table's backend hdfs location, and reads data.

by default every table is inner table. [managed table].

 to create external table.

hive> create external table etab1(line string);

hive>load data local inpath 'file1'
      into table etab1;

hive> load data local inpath 'file2'
    into table etab1;

 now etab1 is created under mydb database,
  under etab1 table directory we have 3 files.

   these file locations will be updated hive metastore(in rdbms).

 when this table is dropped from hive..

hive> drop table etab1;

  -- from rdbms , metadata of this table will be deleted.
 -- but still in hdfs, the table directory and its files are available.
  [ data is not lost]

 so that , in future, hive or other ecosystem can use this data. [adv: reusability]

How to reuse it.
----------------

hive> use mydb;
hive> create table etab1(line string);
hive> select * from etab1;
aaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbb

-- when you create etab1,
  in hdfs , under database location,
  one directory should be created .
  but  under /user/hive/warehouse/mydb.db,
   already etab1 is existed with two files.

 if directory existed, hive will use it,
   if not existed, hive will create it.

============

hive> drop table etab1;
 -- both data and metadata will be deleted.
  bcoz, this time we create etab1 as "inner" table.


=================

Both inner and external tables can use
   custom hdfs locations.

hive> create table mytab(line string)
      location '/user/mydata';

 in hdfs,
   /user/mydata directory will be created
hive> load data local inpath 'file1'
      into table mytab;

  now file1 will be copied into /user/mydata.


hive> drop table mytab;
 here mytab is created as inner table.
  so both metadata and data (/user/mydata)
   will be deleted

hive> create external table urtab(line string)
   location '/user/urdata';

 now in hdfs,
   /user/urdata  directory will be created.


hive> load data local inpath 'file1'
     into table urtab;

hive> load data local inpath 'file2'
     into table urtab;

hive> load data local inpath 'file3'
     into table urtab;

now file1,2,3 will be copied into
   /user/urdata directory of hdfs.

hive> drop table urtab;
 -- only metadata from rdbms will be deleted.  still /user/urdata directory is available with file1,2,3 files.


Reusing next time:

hive> create table ourtab(line string)
     location '/user/urdata';

  here /user/urdata is already existed in hdfs. so hive will use it. if not existed hive will create it.

=====================================





















   






















   



















 


Wednesday 19 July 2017

Pig Video Lessons

Pig class Links:
PigLab1 Video:
https://drive.google.com/file/d/0B6ZYkhJgGD6XTzVHbzBYUFY0a1k/view?usp=sharing

PigLab Notes1:
https://drive.google.com/file/d/0B6ZYkhJgGD6XeU9tUF9aS3QxUWc/view?usp=sharing

PigLab2 Video:
https://drive.google.com/file/d/0B6ZYkhJgGD6XNnhvZUN5eTJSaHM/view?usp=sharing

PigLab2 Notes:
https://drive.google.com/file/d/0B6ZYkhJgGD6Xd0ZHb1hWZVhjbmc/view?usp=sharing

PigLab3 Video:
https://drive.google.com/file/d/0B6ZYkhJgGD6XY3ZTWFFZZ3VMcnM/view?usp=sharing

PigLab3 Notes:
https://drive.google.com/file/d/0B6ZYkhJgGD6Xb1k1aklZOXdjaUE/view?usp=sharing

PigLab4 Video[its audio]:
https://drive.google.com/file/d/0B6ZYkhJgGD6XWHBoODhPTzJ3LVE/view?usp=sharing

PigLab4 Notes :

https://drive.google.com/file/d/0B6ZYkhJgGD6XU0xiSTM0c1VNcGM/view?usp=sharing

PigLab5 Video :
https://drive.google.com/file/d/0B6ZYkhJgGD6XMDFjNmI0ekg1LU0/view?usp=sharing

PigLab5 Notes :
https://drive.google.com/file/d/0B6ZYkhJgGD6XT0tvMGtheGE3anc/view?usp=sharing

PigLab6 Video :
https://drive.google.com/file/d/0B6ZYkhJgGD6XbEdmaG55a3hkRGc/view?usp=sharing


PigLab6 Notes :
https://drive.google.com/file/d/0B6ZYkhJgGD6XOVRUTkt3Y290YXM/view?usp=sharing

PigLab7 video :

https://drive.google.com/file/d/0B6ZYkhJgGD6XY25CUjlrS05QTjg/view?usp=sharing

PigLab7 Notes :

https://drive.google.com/file/d/0B6ZYkhJgGD6XbGlnQ05wbldUbUU/view?usp=sharing

PigLab8 Notes :
https://drive.google.com/file/d/0B6ZYkhJgGD6XV2ZsUGFUV0tsUVk/view?usp=sharing

PigLab8 Notes :

https://drive.google.com/file/d/0B6ZYkhJgGD6XYW1xNEZpNjdTdHc/view?usp=sharing








Thursday 8 June 2017

Hive Partitioned tables [case study]


[cloudera@quickstart ~]$ cat saleshistory
01/01/2011,2000
01/01/2011,3000
01/02/2011,5000
01/02/2011,4000
01/02/2011,1000
01/03/2011,2000
01/25/2011,3000
01/25/2011,5000
01/29/2011,4000
01/29/2011,1000
02/01/2011,2000
02/01/2011,3000
02/02/2011,8000
03/02/2011,9000
03/02/2011,3000
03/03/2011,5000
03/25/2011,7000
03/25/2011,2000
04/29/2011,5000
04/29/2011,3000
05/01/2011,2000
05/01/2011,3000
05/02/2011,5000
05/02/2011,4000
06/02/2011,1000
06/03/2011,2000
06/25/2011,3000
07/25/2011,5000
07/29/2011,4000
07/29/2011,1000
08/01/2011,2000
08/01/2011,3000
08/02/2011,5000
09/02/2011,4000
09/02/2011,1000
09/03/2011,2000
09/25/2011,3000
10/25/2011,5000
10/29/2011,4000
10/29/2011,1000
10/29/2011,5000
11/01/2011,2000
11/01/2011,3000
11/02/2011,5000
11/02/2011,4000
11/02/2011,1000
11/03/2011,2000
11/25/2011,3000
12/25/2011,5000
12/29/2011,4000
12/29/2011,1000
12/30/2011,9000
12/30/2011,40000
[cloudera@quickstart ~]$

create table myraw(dt string, amt int)
      row format delimited
        fields terminated by ',';

load data local inpath 'saleshistory' into table myraw;

create table urraw like myraw;




-----------------

insert overwrite table urraw
   select * from (
      select dt, amt from myraw
            union all
   select concat(substr(dt,1,9),'2') as dt, amt+1000 as amt
          from myraw
            union all
   select concat(substr(dt,1,9),'3') as dt, amt+4000 as amt
          from myraw
            union all
   select concat(substr(dt,1,9),'4') as dt, amt+500 as amt
          from myraw
            union all
   select concat(substr(dt,1,9),'5') as dt, amt+8000 as amt
          from myraw
            union all
   select concat(substr(dt,1,9),'6') as dt, amt+1000 as amt
          from myraw ) s;

-- 01/01/2011     1000


create table ourraw(dt array<string> , amt int);

insert overwrite table ourraw
  select split(dt,'/') , amt  from urraw;

create table sales(dt string, amt int);

insert overwrite table sales
  select concat(dt[2],'-',dt[0],'-',dt[1]), amt from
        ourraw;




create table salespart(dt string, amt int)
   partitioned by (y int, m int, d int)
 row format delimited
   fields terminated by ',';

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.dynamic.partitions.pernode=10000;






insert overwrite table salespart
   partition (y, m, d)
  select dt, amt, year(dt), month(dt), day(dt)
        from sales;

     ------    sankara.deva2016@gmail.com
   -----sub:   partitions case study.

Wednesday 17 May 2017

Pig : Udfs using Python

we can keep multiple functions
  under one program(.py)

 transoform.py
-------------------------
from pig_util  import outputSchema
@outputSchema(name:Chararray)
def  firstUpper(x):
   fc = x[0].upper()
   rc = x[1:].lower()
   n = fc+rc
   return n
@outputSchema(sex:Chararray)
def  gender(x):
   if x=='m':
      x = 'Male'
   else:
      x = 'Female'
   return x

@outputSchema(dname:chararray)
def dept(dno):
   dname="Others"
   if dno==11:
       dname = 'Marketing'
   elif dno==12:
       dname="Hr"
   elif dno==13:
      dname="Finance"
   return dname

-----------------
register  'transform.py' using jython
  as myprog

res = foreach emp generate
     id, myprog.firstUpper(name) as name,
     sal , myprog.gender(sex) as sex,
       myprog.dept(dno) as dname;
-------------------------------------


pig unions :


 data1 = load 'file1' using PigStorage(',')
   as (name:chararray, city:chararray);

 data2 = load 'file2' using PigStorage(',')
   as (name:chararray, sal:int);

d1 = foreach data1 generate
        name, city, null as sal ;
d2 = foreach data2 generate
        name, null as city, sal;

d = union d1, d2;




Tuesday 16 May 2017

Python Examples 1



name = input("Enter name ")
age = input("Enter age")

print(name, " is ", age, " years old ")
-----------------------------------

# if

a = 10
b = 25
if a>b:
  print(a , " is big")
else:
   print(b , " is big ")
-----------------------------
# nested if
a = 10
b = 20
c = 17
big = 0
if a>b:
  if a>c:
    big=a
  else:
    big=c
elif b>c:
  big=b
else:
  big=c
print("Biggest is ", big)
----------------------------------

# if and loop combination:

lst = [10,20,34,23,12,34,23,45]

big = lst[0]

for v in lst:
  if v>big:
     big=v
print("max of the list ", big)

-------------------------

lst = [10,20,34,23,12,34,23,45]

tot = 0
cnt = 0

for v in lst:
   tot += v
   cnt += 1
avg = tot/cnt
print("avgerage : ", avg)
-------------------------

lst = [10,20,34,23,12,34,23,45]

tot = sum(lst)
cnt  = len(lst)
avg = tot/cnt
max = max(lst)
min = min(lst)
print("tot : ", tot)
print("count : ",cnt)
print("average ", avg)
print("maximum ", max )
print("Minimum ", min)
----------------------------

Accessing List Elements:



------------------------------

updating List Elements:


lst[3] = 100
print(lst)
lst[4:7] = [250]*3
print(lst)

a = [1]*10
print(a)
--------------------------

adding elements to list:

lst = [10,20,34,23,12]

lst.append(100)
lst.append(200)
print(lst)

-----------------------------

merging lists:

lst = [10,20,34,23,12]

lst2 = [100,200,300,400,500]

l = lst + lst2

lst += lst2
print(l)
print(lst)

---------------------------

Transformations:

names = ["ravi", "rani", "vani", "venu", "siri"]

names2 = []

for x in names:
    names2.append(x.upper())
print(names2)
------------------------------

Transformations:
  first char in to upper case,
  and all remainings into lower case.

names = ["ravi", "rani", "vani", "venu", "siri"]

names2 = []

for x in names:
     fc = x[0].upper()
     rc = x[1:].lower()
     name=fc+rc
     names2.append(name)
print(names2)
----------------------------------

     sankara.deva2016@gmail.com
-------------------------------------














 
   





Spark : Spark streaming and Kafka Integration

steps:

 1)  start zookeper server
 2)  Start Kafka brokers [ one or more ]
 3)  create topic .
 4)  start console producer [ to write messages into topic ]
 5) start console consumer [ to test , whether messages are stremed ]
 6) create spark streaming context,
    which streams from kafka topic.
 7) perform transformations or aggregations
 8) output operation : which will direct the results into another kafka topic.
------------------------------------------





   

 


following code tested with ,
  spark 1.6.0 and kafka 0.10.2.0

kafka and spark streaming

bin/zookeeper-server-start.sh  config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(sc, Seconds(5))
import org.apache.spark.streaming.kafka.KafkaUtils
//1.
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
val lines = kafkaStream.map(x => x._2.toUpperCase)

val warr = lines.map(x => x.split(" "))
val pair = warr.map(x => (x,1))
val wc = pair.reduceByKey(_+_)

wc.print()
// use below code to write results into kafka topic
ssc.start

------------------------------
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic results1

// writing into kafka topic.

import org.apache.kafka.clients.producer.ProducerConfig
import java.util.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord


wc.foreachRDD(rdd =>
      rdd.foreachPartition(partition =>

                partition.foreach{
                  case t:(w:String,cnt:Long)=>{
                    val x = w+"\t"+cnt                
                    val props = new HashMap[String, Object]()
                    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                            "localhost:9092")
                    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
                    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")

                    println(x)
                    val producer = new KafkaProducer[String,String](props)
                    val message=new ProducerRecord[String, String]("results1",null,x)
                    producer.send(message)
                  }
                }))

-- execute above code before ssc.start.
--------------------------------------------
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic results1 --from-beginning





-------------------
 val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))

1. --? KafkaUtils.createStream()..
   needs 4 arguments.
    1st --->  streaming Context
    2nd --> zk details.
   3rd --- > consumer group id
   4th ----> Topics.
 
spark streaming can read from multiple topics.
    topic should be as  a key value pair of map object

key ---> topic name
value ---> no.of consumer threads.

 to read from multiple topics,
 the 4th argument should be as follows.
    Map("t1"->2,"t2"->4,"t3"->1)

-------------------------

   each given number of consumer threads will applied on each partition of kafka topic.

   ex: topic has 3 threads,
        consumber threads are 5.
   so , total number of threads = 15.

but these  15 theads are not parallely executed.

at shot, 5 threads for one partiton will be parallely consuming data.

to make all (15) parallel.

val numparts = 3
val kstreams = (1 to numparts).map{x =>
    val kafkaStream = KafkaUtils.createStream(ssc,   "localhost:2181","spark-streaming-consumer-   group",     Map("spark-topic" -> 5))
          }


















Pig : UDFs


Pig UDFS
----------

  UDF ---> user defined functions.
 
   adv:
       i)  custom functionalities.
      ii)  reusability.

 Pig UDFs can be developed by
    java
   python
    ruby
    c++
    javascript
    perl

step1:
   Develop udf code.

step2:
   export into jar file
   ex: /home/cloudera/Desktop/pigs.jar

step3:
   register jar file into pig.
 grunt> register Desktop/pigs.jar

step4:
   create temporory function  for udf class.

 grunt> define  ucase pig.analytics.ConvertUpper();

step5:
  calling the function:

 grunt>e =  foreach emp generate
      id, ucase(name) as name, sal,
        ucase(sex) as sex, dno;

 
package  pig.analytics;
import .....

--> ucase(name) ---> upper conversion

public class ConvertUpper extends EvalFunc<String>
  {
     public String exec(Tuple v)
      throws IOException
     {
        String str = (String)v.get(0);
        String res = str.toUpperCase();
        retrun res;
           
     }

 
 }
--------------------------
$ cat > samp
100,230,400
123,100,90
140,560,430

$ hadoop fs -copyFromLocal samp piglab

grunt> s = load 'piglab/samp'
            using PigStorage(',')
          as (a:int, b:int, c:int);



package pig.analytics;
....
public class RMax extends EvalFunc<Integer>
{
    public  Integer exec(Tuple v)
     throws IOException
    {
      int a =(Integer) v.get(0);
      int b =(Integer) v.get(1);
      int c =(Integer) v.get(2);

      int big = a; // 10,20,3
      if (a>big) big = a;
      if (b>big) big = b;
      if (c>big) big = c;
      return  big;
    }
 }

export into jar : Desktop/pigs.jar

grunt> register Desktop/pigs.jar;

grunt> define rmax pig.analytics.RMax();

grunt> res = foreach s generate *,
                   rmax(*) as max;

--------------------------------

 package pig.analytics;
 .......
 public class RowMax
   extends EvalFunc<Integer>
 {
    public Integer exec(Tuple v) throws IOException
    {
     List<Object> lobs = v.getAll() ;
     int max = 0;
     int cnt =0;
    // -20,-3,-40
     for(Object o : lobs)
     {
       cnt++;
       int val = (Integer)o;
       if(cnt==1) max = val;
       max = Math.max(max, val);
     }
     return max;
    }
 }

export in to jar : Desktop/pigs.jar
grunt> register Desktop/pigs.jar
grunt> define dynmax pig.analytics.RowMax();
grunt> r = foreach s generate *, dynmax(*) as m;
-----------------------------------------

emp = load 'piglab/emp' using PigStorage(',')
   as (id:int, name:chararray, sal:int,
     sex:chararray, dno:int);

grade()
dname()
gender()


package pig.analytics;
public class Gender extends EvalFunc<String>
{
 public String exec(Tuple v) throws IOException
 {
     String s =(String) v.get(0);
     s = s.toUpperCase();
     if (s.matches("F"))
       s = "Female";
     else
       s = "Male";
     return s;
 }
}
-----------------

package pig.analytics;
public class Grade extends EvalFunc<String>
{
 public String exec(Tuple v) throws IOException
 {
     String sal =(Integer) v.get(0);
     String grade;
     if (sal>=70000)
       grade="A";
     else if (sal>=50000)
          grade="B";
         else if (sal>=30000)
               grade="C";
              else
               grade="D";
     return grade;
 }
}
------
package pig.analytics;
public class DeptName extends EvalFunc<String>
{
 public String exec(Tuple v) throws IOException
 {
    int dno = (Integer)v.get(0);
    String dname;
    switch (dno){
    case 11 :
          dname = "Marketing";
          break;
    case 12 :
          dname = "HR";
          break;
    case 13 :
          dname = "Finance";
          break;
    default:
          dname = "Others";
     }
    return dname;  
 }
}
---------------------------------

---------------------------
export into jar : Desktop/pigs.jar;
grunt> register Desktop/pigs.jar;
grunt> define gender pig.analytics.Gender();
grunt> define grade pig.analytics.Grade();
grunt> define dept pig.analytics.DeptName();

grunt> res = foreach emp generate
    id, ucase(name) as name,
     sal, grade(sal) as grade,
    gender(sex) as sex,
    dept(dno) as dname ;
---------------------------------