Question: Write BFS search function in Apache Spark and pandas dataframe: # This is a pairs notation of the edges, for simplicity of visualization graph =
Write BFS search function in Apache Spark and pandas dataframe:
# This is a pairs notation of the edges, for simplicity of visualization graph ABACADCFFABGGHDE # Here's an equivalent dictionary representation that we can use for a # Pandas DataFrame... simpledict fromnode': AAACFBGDtonode': BCDFAGHE simplegraphdf pdDataFrame.fromdictsimpledict simplegraphsdf spark.createDataFramesimplegraphdf simplegraphsdfshow
As you can see, each row of this dataframe represents an edge between two nodes Although the nodes are labeled "from" and to the edges are actually undirected, meaning that AB represents the same edge as BA
Let's define our starting node as follows:
smallOrig node: A
TODO: Write sparkbfsroundvistednodes that takes the currently dataframe of visitednodes, performs one round of BFS and returns an updated visited nodes dataframe. You should assume that a temporary sdf G already exists.
def sparkbfsroundvisitednodes: :param visitednodes: dataframe with columns node and distance :return: dataframe of updated visuted nodes, with columns node and distance #TODO
Now, run the inner function on simpleroundbfssdf result of round of BFS on simple graph and store the results in simplebfsresult. This is ultimately what the output of BFS to depth should look like.
simplegraphsdfcreateOrReplaceTempViewG simplebfsresult #TODO simplebfsresult.show
Convert this result to Pandas, sorted by the node.
simplebfstest #TODO
Now, we will fully implement sparkbfs This function should iteratively call your implemented version of sparkbfsround and ultimately return the output of this function at maxdepth.
You are also responsible for initializing the starting dataframe, that is converting the list of origin nodes into a spark dataframe with the nodes logged at distance
Consider the following:
schema StructType StructFieldnode StringType True mysdf spark.read.formatcsvschemaschemaloadmycsv
The schema ultimately specifies the structure of the Spark DataFrame with a string node column. It then calls spark.load to read the CSV with this schema. Also, you are responsible for ensuring that a view of your graph is available within this function. Note: you will also need to add in a distance column
TODO: implement sparkbfsGorigins,maxdepth and run on reviewgraphsdf initalized in Note: you may want to run tests on the simplegraphexample as the reviewgraphsdf will take quite some time to run.
# TODO: iterative search over undirected graph # Worth points directly, but will be needed later def sparkbfsG origins, maxdepth: runs distributed BFS to a specified max depth :param G: graph dataframe from :param origins: list of origin nodes stored as node: nodeValue :param maxdepth: integer value of max depth to run BFS to :return: dataframe with columns node, distance of all visited nodes #TODO # Remember that if you want to go from Pandas dataframes to Spark dataframes, # you may need to write to a CSV and read it back.
Step by Step Solution
There are 3 Steps involved in it
1 Expert Approved Answer
Step: 1 Unlock
Question Has Been Solved by an Expert!
Get step-by-step solutions from verified subject matter experts
Step: 2 Unlock
Step: 3 Unlock
