Apache Flink Real World Use Case – Crime Data Analysis Part II


1. Objective

This is the second part of Apache flink real world Use case – Crime Data Analysis, for details about the case study follow the first part. In this blog we will use various Apache Flink APIs like readTextFile, readCsvFile, includeFields, groupBy, FlatMap, Map etc. to analyze the crime report use-case.

Learn how to set up Apache Flink environment in eclipse IDE and run wordcount program.

Apache Flink Real World Use Case- Crime Data Analysis Training Tutorial DataFlair

2. Platform

  • Operating system: Windows / Linux / Mac
  • Apache Flink
  • Java 7.x or higher
  • IDE: Eclipse

3. Steps to make project and Add dependencies

To understand how to make a project in eclipse and add dependencies you can refer this blog.

4. Sample Data

Following is the sample dataset for the use-case:

cdatetime,address,district,beat,grid,crimedescr,ucr_ncic_code,latitude,longitude
1/1/2006 0:00,3108 OCCIDENTAL DR,3,3C,1115,484 PC PETTY THEFT/INSIDE,2404,38.47350069,-121.4901858
1/1/2006 0:00,4 PALEN CT,2,2A,212,459 PC BURGLARY BUSINESS,2299,38.47350069,-121.4901858
1/1/2006 0:00,3547 P ST,3,3C,853,484 PC PETTY THEFT/INSIDE,2404,38.47350069,-121.4901858
1/1/2006 0:00,3421,AUBURN BLVD,2,2A,508	459 PC BURGLARY BUSINESS,2299,38.47350069,-121.4901858
1/1/2006 0:00,6351 DRIFTWOOD ST,4,4C,1261,SUSP PERS-NO CRIME – I RPT,7000,38.47350069,-121.4901858
1/1/2006 0:01,7001 EAST PKWY,6,6C ,1427,503 PC   EMBEZZLEMENT,2799,38.51436734,-121.3854286
1/1/2006 0:01,918 LAKE FRONT DR,4,4C,1294,TELEPEST -I RPT,7000,38.47948616,-121.5215057
1/1/2006 0:01,4851 KOKOMO DR,1,1A ,123,487(A) GRAND THEFT-INSIDE,2308,38.65994218,-121.5259008
1/1/2006 0:01,2377 OAK HARBOUR DR,1,1B ,440,653M(A) PC OBSCENE/THREAT CALL,5309,38.60893745,-121.5187927
1/1/2006 0:01,1823 P ST,3,3B ,766,484 PETTY THEFT/LICENSE PLATE,2399,38.57084621,-121.484429
1/1/2006 0:01,1100 14TH ST,3,3M,745,FOUND PROPERTY - I RPT,7000,38.57815667,-121.4876958
1/1/2006 0:01,3301 ARENA BLVD,1,1A,303,530.5 PC USE PERSONAL ID INFO,2604,38.64378844,-121.5341593
1/1/2006 0:01,1088 PREGO WAY,1,1B,404,530.5 PC USE PERSONAL ID INFO,2604,38.62798948,-121.4857344
1/1/2006 0:01,3259 SPINNING ROD WAY,1,1B,475,484J PC PUBLISH CARD INFO,2605,38.61153542,-121.5370613
1/1/2006 0:01,1010 J ST,3,3M,744,THREATS - I RPT,7000,38.57999251,-121.4930384
1/1/2006 0:01,400 BANNON ST,3,3A,704,530 PC FALSE PERS. REC PROP,2604,38.59540732,-121.4978798

The above data is the crime record which has nine fields. You can download the data file from this link.

5. Apache Flink Real World Use Case – Crime Data Analysis

In this section of Apache Flink real world use case, two different problem statements are given below along with their solution through Apache Flink program in java and output.

5.1 Problem Statement – 1

In this use case, we will analyze the data and find out the Hour of the day when a maximum crime occurs (Highest Crime Hour Analysis) for each day.

a. Apache-Flink program

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple3;

public class Maximum_Crime_Hour {

 public static void main(String[] args) throws Exception {

 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 DataSet < Tuple1 < String >> rawdata = env.readCsvFile("E:\\SacramentocrimeJanuary2006.csv")
.includeFields("1000000").ignoreFirstLine()
.types(String.class);

rawdata.map(new TimeExtractor()) //map the data according to the MM/dd/yyyy HH
.groupBy(0, 1) //group the data according to date & hour
.sum(2) // sum on the field 2 to count 
.groupBy(0) //group the data according to field date
.maxBy(0,2) //to find out the maximum on the basis of per day

.print();  //print the result
                                                                                                                                                                                                                                                  
 }
 
 public static class TimeExtractor implements MapFunction< Tuple1 < String >, Tuple3<String, String, Integer>> {
	 
		@Override
		public Tuple3<String, String, Integer> map(Tuple1<String> time) throws Exception {
			 SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy HH");
			 SimpleDateFormat formatter2 = new SimpleDateFormat("MM/dd/yyyy HH:mm");
			 
			 String dateInString = time.f0;
			  Date date = formatter2.parse(dateInString);
			  
			  String dateTokens[] = formatter.format(date).split(" ");
			
			return new Tuple3<>(dateTokens[0], dateTokens[1], 1);
		}
	} 
}

 

b. Output

Date,Hour,No_Of_Crimes
(01/01/2006,00,57)
(01/05/2006,20,22)
(01/08/2006,23,16)
(01/13/2006,09,20)
(01/14/2006,00,16)
(01/19/2006,09,20)
(01/20/2006,00,21)
(01/23/2006,00,24)
(01/31/2006,12,19)

Apache Flink crime analysis output 1

5.2. Problem Statement – 2

In this use case, we will analyze the data and find out the District with least number of crimes (Safest District).

a. Apache Flink Program

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class SafeDistrict{

	public static void main(String[] args) throws Exception {

		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
         DataSet<String> rawdata = env.readTextFile("E:\\SacramentocrimeJanuary2006.csv");
         DataSet <Tuple2<String, Integer>> result = rawdata
                 .flatMap(new Counter())//map the data and as district,1
                 .groupBy(0) // group the data according to district
                 .sum(1) // to count no. of crimes in a district
                 .minBy(1); //to find out the minimum crime 
         result.print();
	}
	
	public static class Counter implements FlatMapFunction<String, Tuple2<String, Integer>> {
	    @Override
	    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
	        String[] tokens = value.split(",");
	        if(tokens[2].contains("district"))
	        {
	        	return;
	        }
	        else
	        {
	                out.collect(new Tuple2<String, Integer>(tokens[2], 1));
	        }
	    }
}
}

b. Output

 (District,Crime_Count) (1,868) 

To learn the comparison between Different big data technologies like- Apache Hadoop, Apache Spark, follow this comparison guide.

Leave a comment

Your email address will not be published. Required fields are marked *