Spark Shell Commands to Interact with Spark-Scala
1. Objective
The shell acts as an interface to access the operating system’s service. Apache Spark is shipped with an interactive shell/scala prompt with the interactive shell we can run different commands to process the data. This is an Apache Spark Shell commands guide with step by step list of basic spark commands/operations to interact with Spark shell.
Before starting you must have Spark installed. follow this guide to install Apache Spark.
After Spark installation, You can create RDDs and perform various transformations and actions like filter(), partitions(), cache(), count(), collect, etc. In this blog, we will also discuss the integration of Spark with Hadoop, how spark reads the data from HDFS and write to HDFS?.
2. Scala – Spark Shell Commands
Start the Spark Shell
Apache Spark is shipped with an interactive shell/scala prompt, as the spark is developed in Scala. Using the interactive shell we will run different commands (RDD transformation/action) to process the data.
The command to start the Apache Spark Shell:
[php] $bin/spark-shell [/php]
2.1. Create a new RDD
a) Read File from local filesystem and create an RDD.
[php]scala> val data = sc.textFile(“data.txt”)[/php]
Note: sc is the object of SparkContext
Note: You need to create a file data.txt in Spark_Home directory
b) Create an RDD through Parallelized Collection
[php]scala> val no = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val noData = sc.parallelize(no)[/php]
c) From Existing RDDs
[php]scala> val newRDD = no.map(data => (data * 2))[/php]
These are three methods to create the RDD. We can use the first method, when data is already available with the external systems like a local filesystem, HDFS, HBase, Cassandra, S3, etc. One can create an RDD by calling a textFile method of Spark Context with path / URL as the argument. The second approach can be used with the existing collections and the third one is a way to create new RDD from the existing one.
2.2. Number of Items in the RDD
Count the number of items available in the RDD. To count the items we need to call an Action:
[php]scala> data.count()[/php]
2.3. Filter Operation
Filter the RDD and create new RDD of items which contain word “DataFlair”. To filter, we need to call transformation filter, which will return a new RDD with subset of items.
[php]scala> val DFData = data.filter(line => line.contains(“DataFlair”))[/php]
2.4. Transformation and Action together
For complex requirements, we can chain multiple operations together like filter transformation and count action together:
[php]scala> data.filter(line => line.contains(“DataFlair”)).count()[/php]
2.5. Read the first item from the RDD
To read the first item from the file, you can use the following command:
[php]scala> data.first()[/php]
2.6. Read the first 5 item from the RDD
To read the first 5 item from the file, you can use the following command:
[php]scala> data.take(5)[/php]
2.7. RDD Partitions
An RDD is made up of multiple partitions, to count the number of partitions:
[php]scala> data.partitions.length[/php]
Note: Minimum no. of partitions in the RDD is 2 (by default). When we create RDD from HDFS file then a number of blocks will be equals to the number of partitions.
2.8. Cache the file
Caching is the optimization technique. Once we cache the RDD in the memory all future computation will work on the in-memory data, which saves disk seeks and improve the performance.
[php]scala> data.cache()[/php]
RDD will not be cached once you run above operation, you can visit the web UI:
http://localhost:4040/storage, it will be blank. RDDs are not explicitly cached once we run cache(), rather
RDDs will be cached once we run the Action, which actually needs data read from the disk.
Let’s run some actions
[php]scala> data.count()[/php]
[php]scala> data.collect()[/php]
Now as we have run some actions on the data file, which needs to be read from the disk to perform those operations. During this process, Spark will cache the file, so that for all future operations will get the data from the memory (no need for any disk interaction). Now if we run any transformation or action it will be done in-memory and will be much faster.
2.9. Read Data from HDFS file
To read data from HDFS file we can specify complete hdfs URL like hdfs://IP:PORT/PATH
[php]scala> var hFile = sc.textFile(“hdfs://localhost:9000/inp”)[/php]
2.10. Spark WordCount Program in Scala
One of the most popular operations of MapReduce – Wordcount. Count all the words available in the file.
[php]scala> val wc = hFile.flatMap(line => line.split(” “)).map(word => (word, 1)).reduceByKey(_ + _)[/php]
Read the result on console
[php]scala> wc.take(5)[/php]
It will display first 5 results
2.11 Write the data to HDFS file
[php]scala> wc.saveAsTextFile(“hdfs://localhost:9000/out”)[/php]
3. Conclusion
In conclusion, we can say that using Spark Shell commands we can create RDD (In three ways), read from RDD, and partition RDD. We can even cache the file, read and write data from and to HDFS file and perform various operation on the data using the Apache Spark Shell commands.
Now you can create your first Spark Scala project.
See Also-
Did you like our efforts? If Yes, please give DataFlair 5 Stars on Google
Reading your website is pure pleasure for me, thanks for sharing such a lovely blog on Spark shell commands.
Hii Elsie,
You are amazing. Grateful to you for such kind words on our hard work of Spark Shell Commands. Visit again on Data Flair for more Spark learning.
Good luck from the site.
Excellent tutorial. I like your images and videos.
Purna, glad to see such a lovely review on our Spark Shell Commands. These images and videos are specially designed to explain the Spark topics easily via visuals. Follow our links for more Spark Tutorials to learn Spark deeply.
Could u please explain, the role of 1 in below line, and also and what is the meaning of reduceByKey(_+_) ?
scala> val wc = hFile.flatMap(line => line.split(” “)).map(word => (word, 1)).reduceByKey(_ + _)
Thanks
Note: Minimum no. of partitions in the RDD is 2 (by default). When we create RDD from HDFS file then a number of blocks will be equals to the number of partitions.
can you please update Minimum no. of partitions in the RDD is 4 (by default) not 2.
if we will create rdd from sc.textFile() then default partitions is 2 assuming file size is less the 2 ,
if we will create rdd from sc.parallelize then default partitions is 4
Let me explain with a example.
val hfile = “this is an example this is to understand example”
//Here a is the string . Let us split the code one by one.
hfile.flatmap(line => line.split(” “))
In this case you will get it as
this = arr(0)
is = arr(1)
an = arr(2)
example = arr(3)
this = arr(4)
is = arr(5)
to = arr(6)
understand = arr(7)
example = arr(8)
hFile.flatMap(line => line.split(” “)).map(word => (word, 1))
Here the words will be given with a number
(this,1)
(is,1)
(an,1)
(example,1)
(this,1)
(is,1)
(to,1)
(understand,1)
(example,1)
val wc = hFile.flatMap(line => line.split(” “)).map(word => (word, 1)).reduceByKey(_ + _)
Here the key are the array values . And value are the counter like thing. The value get reduced when it finds the matching keys.
(this,1) ==reduceByKey(0,1) => (this,0+1) => (this,1)
(is,1) ==reduceByKey(0,1) => (is,0+1) => (is,1)
(an,1) ==reduceByKey(0,1) => (an,0+1) => (an,1)
(example,1) ==reduceByKey(0,1) => (example,0+1) => (example,1)
Now here is the one you need to note . reduceByKey(1,1) . Where the (x,y) (1,1) is the index of arr(0) which is already present so (counter,1)
(this,1) ==reduceByKey(1,1) => (this,1+1) => (this,2)
(is,1)==reduceByKey(1,1) => (is,1+1) => (is,2)
(to,1) ==reduceByKey(0,1) => (to,0+1) => (to,1)
(understand,1)==reduceByKey(0,1) => (understand,0+1) => (understand,1)
(example,1)==reduceByKey(0,1) => (example,0+1) => (example,1)
Hope . It clears your doubt 🙂