Spark Shell Commands to Interact with Spark-Scala

Boost your career with Free Big Data Courses!!

1. Objective

The shell acts as an interface to access the operating system’s service. Apache Spark is shipped with an interactive shell/scala prompt with the interactive shell we can run different commands to process the data. This is an Apache Spark Shell commands guide with step by step list of basic spark commands/operations to interact with Spark shell.

Before starting you must have Spark installed. follow this guide to install Apache Spark.
After Spark installation, You can create RDDs and perform various transformations and actions like filter(), partitions(), cache(), count(), collect, etc. In this blog, we will also discuss the integration of Spark with Hadoop, how spark reads the data from HDFS and write to HDFS?.

Spark Shell Commands to Interact with Spark-Scala

Spark Shell Commands to Interact with Spark-Scala

2. Scala – Spark Shell Commands

Start the Spark Shell

Apache Spark is shipped with an interactive shell/scala prompt, as the spark is developed in Scala. Using the interactive shell we will run different commands (RDD transformation/action) to process the data.
The command to start the Apache Spark Shell:
[php] $bin/spark-shell [/php]

2.1. Create a new RDD

a) Read File from local filesystem and create an RDD.

[php]scala> val data = sc.textFile(“data.txt”)[/php]

Note: sc is the object of SparkContext

Note: You need to create a file data.txt in Spark_Home directory

b) Create an RDD through Parallelized Collection

[php]scala> val no = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val noData = sc.parallelize(no)[/php]

c) From Existing RDDs

[php]scala> val newRDD = no.map(data => (data * 2))[/php]

These are three methods to create the RDD. We can use the first method, when data is already available with the external systems like a local filesystem, HDFS, HBase, Cassandra, S3, etc. One can create an RDD by calling a textFile method of Spark Context with path / URL as the argument. The second approach can be used with the existing collections and the third one is a way to create new RDD from the existing one.

2.2. Number of Items in the RDD

Count the number of items available in the RDD. To count the items we need to call an Action:

[php]scala> data.count()[/php]

2.3. Filter Operation

Filter the RDD and create new RDD of items which contain word “DataFlair”. To filter, we need to call transformation filter, which will return a new RDD with subset of items.

[php]scala> val DFData = data.filter(line => line.contains(“DataFlair”))[/php]

2.4. Transformation and Action together

For complex requirements, we can chain multiple operations together like filter transformation and count action together:

[php]scala> data.filter(line => line.contains(“DataFlair”)).count()[/php]

2.5. Read the first item from the RDD

To read the first item from the file, you can use the following command:

[php]scala> data.first()[/php]

2.6. Read the first 5 item from the RDD

To read the first 5 item from the file, you can use the following command:

[php]scala> data.take(5)[/php]

2.7. RDD Partitions

An RDD is made up of multiple partitions, to count the number of partitions:

[php]scala> data.partitions.length[/php]

Note: Minimum no. of partitions in the RDD is 2 (by default). When we create RDD from HDFS file then a number of blocks will be equals to the number of partitions.

2.8. Cache the file

Caching is the optimization technique. Once we cache the RDD in the memory all future computation will work on the in-memory data, which saves disk seeks and improve the performance.

[php]scala> data.cache()[/php]

RDD will not be cached once you run above operation, you can visit the web UI:

http://localhost:4040/storage, it will be blank. RDDs are not explicitly cached once we run cache(), rather

RDDs will be cached once we run the Action, which actually needs data read from the disk.

Let’s run some actions

[php]scala> data.count()[/php]

[php]scala> data.collect()[/php]

Now as we have run some actions on the data file, which needs to be read from the disk to perform those operations. During this process, Spark will cache the file, so that for all future operations will get the data from the memory (no need for any disk interaction). Now if we run any transformation or action it will be done in-memory and will be much faster.

2.9. Read Data from HDFS file

To read data from HDFS file we can specify complete hdfs URL like hdfs://IP:PORT/PATH

[php]scala> var hFile = sc.textFile(“hdfs://localhost:9000/inp”)[/php]

2.10. Spark WordCount Program in Scala

One of the most popular operations of MapReduceWordcount. Count all the words available in the file.

[php]scala> val wc = hFile.flatMap(line => line.split(” “)).map(word => (word, 1)).reduceByKey(_ + _)[/php]

Read the result on console

[php]scala> wc.take(5)[/php]

It will display first 5 results

2.11 Write the data to HDFS file

To write the data from HFDS:

[php]scala> wc.saveAsTextFile(“hdfs://localhost:9000/out”)[/php]

3. Conclusion

In conclusion, we can say that using Spark Shell commands we can create RDD (In three ways), read from RDD, and partition RDD. We can even cache the file, read and write data from and to HDFS file and perform various operation on the data using the Apache Spark Shell commands.
Now you can create your first Spark Scala project.

See Also-

Your opinion matters
Please write your valuable feedback about DataFlair on Google

follow dataflair on YouTube

8 Responses

  1. Elsie says:

    Reading your website is pure pleasure for me, thanks for sharing such a lovely blog on Spark shell commands.

    • Data Flair says:

      Hii Elsie,
      You are amazing. Grateful to you for such kind words on our hard work of Spark Shell Commands. Visit again on Data Flair for more Spark learning.
      Good luck from the site.

  2. purna says:

    Excellent tutorial. I like your images and videos.

    • Data Flair says:

      Purna, glad to see such a lovely review on our Spark Shell Commands. These images and videos are specially designed to explain the Spark topics easily via visuals. Follow our links for more Spark Tutorials to learn Spark deeply.

  3. Devi says:

    Could u please explain, the role of 1 in below line, and also and what is the meaning of reduceByKey(_+_) ?
    scala> val wc = hFile.flatMap(line => line.split(” “)).map(word => (word, 1)).reduceByKey(_ + _)
    Thanks

  4. ekram says:

    Note: Minimum no. of partitions in the RDD is 2 (by default). When we create RDD from HDFS file then a number of blocks will be equals to the number of partitions.
    can you please update Minimum no. of partitions in the RDD is 4 (by default) not 2.

  5. ekram says:

    if we will create rdd from sc.textFile() then default partitions is 2 assuming file size is less the 2 ,
    if we will create rdd from sc.parallelize then default partitions is 4

  6. Nagini S says:

    Let me explain with a example.
    val hfile = “this is an example this is to understand example”
    //Here a is the string . Let us split the code one by one.
    hfile.flatmap(line => line.split(” “))
    In this case you will get it as
    this = arr(0)
    is = arr(1)
    an = arr(2)
    example = arr(3)
    this = arr(4)
    is = arr(5)
    to = arr(6)
    understand = arr(7)
    example = arr(8)

    hFile.flatMap(line => line.split(” “)).map(word => (word, 1))
    Here the words will be given with a number

    (this,1)
    (is,1)
    (an,1)
    (example,1)
    (this,1)
    (is,1)
    (to,1)
    (understand,1)
    (example,1)

    val wc = hFile.flatMap(line => line.split(” “)).map(word => (word, 1)).reduceByKey(_ + _)

    Here the key are the array values . And value are the counter like thing. The value get reduced when it finds the matching keys.

    (this,1) ==reduceByKey(0,1) => (this,0+1) => (this,1)
    (is,1) ==reduceByKey(0,1) => (is,0+1) => (is,1)
    (an,1) ==reduceByKey(0,1) => (an,0+1) => (an,1)
    (example,1) ==reduceByKey(0,1) => (example,0+1) => (example,1)
    Now here is the one you need to note . reduceByKey(1,1) . Where the (x,y) (1,1) is the index of arr(0) which is already present so (counter,1)
    (this,1) ==reduceByKey(1,1) => (this,1+1) => (this,2)
    (is,1)==reduceByKey(1,1) => (is,1+1) => (is,2)
    (to,1) ==reduceByKey(0,1) => (to,0+1) => (to,1)
    (understand,1)==reduceByKey(0,1) => (understand,0+1) => (understand,1)
    (example,1)==reduceByKey(0,1) => (example,0+1) => (example,1)

    Hope . It clears your doubt 🙂

Leave a Reply

Your email address will not be published. Required fields are marked *