Question: Ok just a few more issues in my code! project 2 part G and H previous code: Part A import numpy as np import pandas

Ok just a few more issues in my code! project 2 part G and H

previous code:

Part A

import numpy as np

import pandas as pd

import matplotlib.pyplot as plt

from pyspark.sql import SparkSession

from pyspark.sql.functions import col, expr

spark = SparkSession.builder.getOrCreate()

Part B

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

accepted_schema = StructType([

StructField("acc_term_id", StringType(), True),

StructField("sid", IntegerType(), True),

StructField("first_name", StringType(), True),

StructField("last_name", StringType(), True),

StructField("major", StringType(), True)

])

alumni_schema = StructType([

StructField("sid", IntegerType(), True)

])

expelled_schema = StructType([

StructField("sid", IntegerType(), True)

])

unretained_schema = StructType([

StructField("sid", IntegerType(), True)

])

faculty_schema = StructType([

StructField("fid", IntegerType(), True),

StructField("first_name", StringType(), True),

StructField("last_name", StringType(), True),

StructField("dept", StringType(), True)

])

courses_schema = StructType([

StructField("dept", StringType(), True),

StructField("course", StringType(), True),

StructField("prereq", StringType(), True),

StructField("credits", IntegerType(), True)

])

grades_schema = StructType([

StructField("term_id", StringType(), True),

StructField("course", StringType(), True),

StructField("sid", IntegerType(), True),

StructField("fid", IntegerType(), True),

StructField("grade", StringType(), True)

])

accepted = spark.read.csv("/FileStore/tables/univ/accepted.csv", header=True, schema=accepted_schema)

alumni = spark.read.csv("/FileStore/tables/univ/alumni.csv", header=True, schema=alumni_schema)

expelled = spark.read.csv("/FileStore/tables/univ/expelled.csv", header=True, schema=expelled_schema)

unretained = spark.read.csv("/FileStore/tables/univ/unretained.csv", header=True, schema=unretained_schema)

faculty = spark.read.csv("/FileStore/tables/univ/faculty.csv", header=True, schema=faculty_schema)

courses = spark.read.csv("/FileStore/tables/univ/courses.csv", header=True, schema=courses_schema)

grades = spark.read.csv("/FileStore/tables/univ/grades.csv", header=True, schema=grades_schema)

dataframes = [accepted, alumni, expelled, unretained, faculty, courses, grades]

names = ["accepted", "alumni", "expelled", "unretained", "faculty", "courses", "grades"]

for name, df in zip(names, dataframes):

count = df.count()

print(f"The number of records in {name} is {count}.")

Part C

enrolled = accepted.join(grades, accepted.sid == grades.sid, "inner").select(accepted.sid.alias("enrolled_sid"), "*")

current = enrolled.join(alumni, enrolled.enrolled_sid == alumni.sid, "left_anti") \

.join(unretained, enrolled.enrolled_sid == unretained.sid, "left_anti") \

.join(expelled, enrolled.enrolled_sid == expelled.sid, "left_anti")

former = enrolled.join(current, enrolled.enrolled_sid == current.enrolled_sid, "left_anti")

counts = {

"Number of accepted students: ": accepted.count(),

"Number of enrolled students: ": enrolled.count(),

"Number of current students: ": current.count(),

"Number of former students: ": former.count(),

"Number of unretained students:": unretained.count(),

"Number of expelled students: ": expelled.count(),

"Number of alumni: ": alumni.count()

}

for key, value in counts.items():

print(f"{key} {value}")

Part D

num_current_students = current.count()

major_distribution = current.groupBy('major').count()

major_distribution = major_distribution.withColumnRenamed('count', 'n_students')

major_distribution = major_distribution.withColumn('prop',

(col('n_students') / num_current_students).cast('double'))

major_distribution = major_distribution.orderBy('prop', ascending=False)

major_distribution.show()

Part E

sp21_enr = grades.filter(grades.term_id == "2021A")

department_enrollments = sp21_enr.join(courses, sp21_enr.course == courses.course, "inner")

department_enrollments = department_enrollments.groupBy("dept").count().withColumnRenamed("count", "n_students")

total_enrollments = sp21_enr.count()

department_enrollments = department_enrollments.withColumn("prop", (col("n_students") / total_enrollments).cast("double"))

department_enrollments = department_enrollments.orderBy("prop", ascending=False)

department_enrollments.show()

Part F:

former_by_major = former.groupBy("major").count() \

.withColumnRenamed("count", "n_former") \

.orderBy("major")

alumni_by_major = alumni.join(accepted, accepted.sid == alumni.sid, "inner") \

.groupBy("major").count() \

.withColumnRenamed("count", "n_alumni") \

.orderBy("major")

graduation_rates = former_by_major.join(alumni_by_major, "major", "left") \

.withColumn("grad_rate", (col("n_alumni") / col("n_former")).cast("double")) \

.orderBy("major")

graduation_rates.show()

Part G:Part G: Number of Terms Required for Graduation In this part, we

will find a frequency distribution for the number of terms that alumni

Part H: I have the same ‘sid’ error as G.

from pyspark.sql.functions import udf

from pyspark.sql.types import DoubleType

def letter_to_num(letter_grade):

grade_mapping = {"A": 4.0, "B": 3.0, "C": 2.0, "D": 1.0, "F": 0.0}

return grade_mapping.get(letter_grade, 0.0)

letter_to_num_udf = udf(letter_to_num, DoubleType())

current_gpa = grades.join(courses, grades.course == courses.course, "inner") \

.withColumn("num_grade", letter_to_num_udf(col("grade"))) \

.withColumn("gp", col("credits") * col("num_grade")) \

.groupBy("sid").agg(expr("SUM(gp) AS gp_sum"), expr("SUM(credits) AS credits_sum")) \

.withColumn("gpa", (col("gp_sum") / col("credits_sum")).cast("double")) \

.join(current, "sid", "inner") \

.select("sid", "first_name", "last_name", "major", "gpa") \

.orderBy("gpa")

current_gpa.show(10)

required for graduation. Perform the steps below by chaining together DataFrame methodsin a single multi-line statement. Apply a filtering join to the grades

 

Part G: Number of Terms Required for Graduation In this part, we will find a frequency distribution for the number of terms that alumni required for graduation. Perform the steps below by chaining together DataFrame methods in a single multi-line statement. Apply a filtering join to the grades DataFrame to keep only records associated with alumni. Group the filtered Data Frame according to sid. Perform an aggregation to count the number of DISTINCT values of term_id appearing for each alumnus. Name the resulting column n_terms. Group this most recent result according to n_terms. Perform an aggregation to count the number of records associated with each value of n_terms. Name the resulting column n_alumni. Sort the results by n_terms. Use show() to display all records in the the resulting DataFrame.

Step by Step Solution

There are 3 Steps involved in it

1 Expert Approved Answer
Step: 1 Unlock blur-text-image
Question Has Been Solved by an Expert!

Get step-by-step solutions from verified subject matter experts

Step: 2 Unlock
Step: 3 Unlock

Students Have Also Explored These Related Programming Questions!