Question: Write BFS search function in Apache Spark and pandas dataframe: # This is a pairs notation of the edges, for simplicity of visualization graph =

Write BFS search function in Apache Spark and pandas dataframe:
# This is a pairs notation of the edges, for simplicity of visualization graph =[('A','B'),('A','C'),('A','D'),('C','F'),('F','A'),('B','G'),('G','H'),('D','E')] # Here's an equivalent dictionary representation that we can use for a # Pandas DataFrame... simple_dict ={'from_node': ['A','A','A','C','F','B','G','D'],'to_node': ['B','C','D','F','A','G','H','E']} simple_graph_df = pd.DataFrame.from_dict(simple_dict) simple_graph_sdf = spark.createDataFrame(simple_graph_df) simple_graph_sdf.show()
As you can see, each row of this dataframe represents an edge between two nodes Although the nodes are labeled "from" and "to", the edges are actually undirected, meaning that A-->B represents the same edge as B-->A.
Let's define our starting node as follows:
smallOrig =[{'node': 'A'}]
TODO: Write spark_bfs_1_round(visted_nodes) that takes the currently dataframe of visited_nodes, performs one round of BFS, and returns an updated visited nodes dataframe. You should assume that a temporary sdf G already exists.
def spark_bfs_1_round(visited_nodes): """ :param visited_nodes: dataframe with columns node and distance :return: dataframe of updated visuted nodes, with columns node and distance """ #TODO
Now, run the inner function on simple_1_round_bfs_sdf result of 1 round of BFS on simple graph and store the results in simple_bfs_result. This is ultimately what the output of BFS to depth 2 should look like.
simple_graph_sdf.createOrReplaceTempView('G') simple_bfs_result = #TODO simple_bfs_result.show()
Convert this result to Pandas, sorted by the node.
simple_bfs_test = #TODO
Now, we will fully implement spark_bfs. This function should iteratively call your implemented version of spark_bfs_1_round and ultimately return the output of this function at max_depth.
You are also responsible for initializing the starting dataframe, that is converting the list of origin nodes into a spark dataframe with the nodes logged at distance 0.
Consider the following:
schema = StructType([ StructField("node", StringType(), True)]) my_sdf = spark.read.format("csv").schema(schema).load("my.csv")
The schema ultimately specifies the structure of the Spark DataFrame with a string node column. It then calls spark.load to read the CSV with this schema. Also, you are responsible for ensuring that a view of your graph is available within this function. (Note: you will also need to add in a distance column)
TODO: implement spark_bfs(G,origins,max_depth) and run on review_graph_sdf initalized in 4.3. Note: you may want to run tests on the simple_graphexample as the review_graph_sdf will take quite some time to run.
# TODO: iterative search over undirected graph # Worth 5 points directly, but will be needed later def spark_bfs(G, origins, max_depth): """ runs distributed BFS to a specified max depth :param G: graph dataframe from 4.3 :param origins: list of origin nodes stored as {"node": nodeValue} :param max_depth: integer value of max depth to run BFS to :return: dataframe with columns node, distance of all visited nodes """ #TODO # Remember that if you want to go from Pandas dataframes to Spark dataframes, # you may need to write to a CSV and read it back.

Step by Step Solution

There are 3 Steps involved in it

1 Expert Approved Answer
Step: 1 Unlock blur-text-image
Question Has Been Solved by an Expert!

Get step-by-step solutions from verified subject matter experts

Step: 2 Unlock
Step: 3 Unlock

Students Have Also Explored These Related Programming Questions!