Spark machine learning inventory
https://github.com/claesenm/spark-ml-inventory- Apache Spark & Python (pySpark) tutorials for Big Data Analysis and Machine Learning as IPython / Jupyter notebooks: http://jadianes.me/spark-py-notebooks/
- MLlib: Classification with Logistic Regression: https://github.com/jadianes/spark-py-notebooks/blob/master/nb8-mllib-logit/nb8-mllib-logit.ipynb
Spark SQL and Dataframes: Python and Spark
Getting the Data and Creating the RDD
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file).cache()
Step 2: Create a Dataframe
A Spark DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R or Pandas. They can be constructed from a wide array of sources such as an existing RDD in our case.
The entry point into all SQL functionality in Spark is the SQLContext class. To create a basic instance, all we need is a SparkContext reference.
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
Inferring the Schema: With a SQLContext, we are ready to create a DataFrame from our existing RDD. But first we need to tell Spark SQL the schema in our data.
Step 3:
Once we have our RDD of
Row
we can infer and register the schema.interactions_df = sqlContext.createDataFrame(row_data)
interactions_df.registerTempTable("interactions")
Now we can run SQL queries over our data frame that has been registered as a table.
# Select tcp network interactions with more than 1 second duration and no transfer from destination
tcp_interactions = sqlContext.sql("""
SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions.show()
The results of SQL queries are RDDs and support all the normal RDD operations.
# Output duration together with dst_bytes
tcp_interactions_out = tcp_interactions.rdd.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))
for ti_out in tcp_interactions_out.collect():
print ti_out
One thing to remember: You can’t map a dataframe, but you can convert the dataframe to an RDD and map that by doing spark_df.rdd.map(). Prior to Spark 2.0, spark_df.map would alias to spark_df.rdd.map(). With Spark 2.0, you must explicitly call .rdd first.
Comments
Post a Comment