Yelp Dataset Analysis with Spark: Business Insights

Yelp Dataset Analysis with Spark

This document outlines a Spark-based analysis of the Yelp dataset to extract valuable business insights. The analysis includes category-based business performance, user review patterns, and location-specific trends.

Data Loading and Preparation

First, we load the business data from a JSON file:

val path = "/Users/ishanhanda/Downloads/yelp_dataset_challenge_round9/yelp_academic_dataset_business.json"
val businessDF = spark.read.json(path)

Next, we flatten the categories array:

val businessCategoryFlattenedDF = businessDF.withColumn("category", explode(
    when(col("categories").isNotNull, col("categories"))
    .otherwise(array(lit(null).cast("string")))
)

Business Analysis by City and Category

We group businesses by city and category to calculate the sum of review counts:

val businessByCityDF = businessCategoryFlattenedDF.groupBy("city", "category").sum("review_count")
businessByCityDF.show()
businessByCityDF.sort($"sum(review_count)".desc).limit(100).registerTempTable("q1")

Average Stars by City and Category

We calculate the average star rating for each category within each city:

val businessByCityAverageStarsDF = businessCategoryFlattenedDF.groupBy("category", "city").mean("stars").sort($"category", $"avg(stars)".desc)
businessByCityAverageStarsDF.show()
businessByCityAverageStarsDF.sort($"avg(stars)".desc).limit(50).registerTempTable("q2")

Wisconsin Business Analysis

We filter businesses within specific latitude and longitude coordinates in Wisconsin and calculate the average star rating by category:

val wisconsinsDF = businessCategoryFlattenedDF
    .filter("latitude > 42.908333 AND latitude < 43.241667 AND longitude > -89.583889 AND longitude < -89.250556")
    .groupBy("category")
    .mean("stars").sort($"avg(stars)".desc)
wisconsinsDF.show()

User and Review Data Loading

We load user and review data from JSON files:

val path_users = "/Users/ishanhanda/Downloads/yelp_dataset_challenge_round9/yelp_academic_dataset_user.json"
val userDF = spark.read.json(path_users)
val path_reviews = "/Users/ishanhanda/Downloads/yelp_dataset_challenge_round9/yelp_academic_dataset_review.json"
val reviewDF = spark.read.json(path_reviews)

Top 10 Users and Their Category Preferences

We identify the top 10 users based on their review count and analyze their preferred business categories:

val top10Users = userDF
    .sort($"review_count".desc).limit(10)
val joinUsers_Reviews = top10Users.join(reviewDF, top10Users("user_id") <=> reviewDF("user_id")).drop(reviewDF("user_id"))
val top10_StarsByCategory = joinUsers_Reviews
    .join(businessCategoryFlattenedDF, joinUsers_Reviews("business_id") <=> businessCategoryFlattenedDF("business_id"))
    .drop(businessCategoryFlattenedDF("name")).drop(businessCategoryFlattenedDF("stars"))
    .groupBy("name", "user_id", "category")
    .mean("stars")
top10_StarsByCategory.show()
top10_StarsByCategory.registerTempTable("q4")

Food Business Analysis in Wisconsin

We analyze food-related businesses in Wisconsin, focusing on top and bottom performers:

val business_Food =  businessCategoryFlattenedDF
    .filter("latitude > 42.908333 AND latitude < 43.241667 AND longitude > -89.583889 AND longitude < -89.250556")
    .filter("category LIKE '%Food%' ")
    .sort($"stars".desc)
val top10DF = business_Food.sort($"stars".desc).limit(10)
val bottom10DF = business_Food.sort($"stars").limit(10)

Review Analysis by Month (January to May)

We analyze reviews from January to May for the top and bottom 10 food businesses:

val reviewDF_month = reviewDF.withColumn("month", month(col("date").cast("date")))
    .filter("month >= 1 AND month <= 5")
val avgTop10 = top10DF
    .join(reviewDF_month, reviewDF_month("business_id") <=> top10DF("business_id"))
    .drop(reviewDF_month("business_id")).drop(top10DF("stars"))
    .groupBy("business_id", "name", "month")
    .mean("stars")
avgTop10.show()
avgTop10.registerTempTable("q5_top")

val avgBottom10 = bottom10DF
    .join(reviewDF_month, reviewDF_month("business_id") <=> bottom10DF("business_id"))
    .drop(reviewDF_month("business_id")).drop(bottom10DF("stars"))
    .groupBy("business_id", "name", "month")
    .mean("stars")
avgBottom10.show()
avgBottom10.registerTempTable("q5_bottom")