Spark in Python 安装和实例

1.1 介绍

Spark是一个快速通用的集成计算系统,专门用于处理海量数据。Spark为Scala,Java,Python,R提供了大量丰富的接口。Spark主要包括四个工具集:Spark SQL用于处理SQL和DataFrames,MLlib用于机器学习函数实现,GraphX用于图形处理,Spark Streaming用于流处理。

1.2 Spark安装 (python in windows)

1.2.1 下载GOW

下载链接:https://medium.com/@GalarnykMichael/install-spark-on-windows-pyspark-4498a5d8d66c

目的:在Windows下使用Linux命令

1.2.2 下载Anaconda

下载链接:https://www.anaconda.com/download/

目的:Python包管理工具

1.2.3 下载Spark

下载链接:http://spark.apache.org/downloads.html

下载完成后将Spark解压到你的目标文件夹内,如 D:\Spark\spark-2.2.0-bin-hadoop2.7

1.2.4 下载winutils.exe

下载方式:在上述Spark目录中运行:

1
curl -k -L -o winutils.exe https://github.com/steveloughran/winutils/blob/master/hadoop-2.6.0/bin/winutils.exe?raw=true

目的:有效解决hadoop在windows运行出现的bug

1.2.5 确定计算机上安装了Java 7+版本

命令:

1
java -version

1.2.6 环境变量设置

命令:永久设置环境变量

1
2
3
4
setx SPARK_HOME D:\Spark\spark-2.2.0-bin-hadoop2.7
setx HADOOP_HOME D:\Spark\spark-2.2.0-bin-hadoop2.7
setx PYSPARK_DRIVER_PYTHON ipython
setx PYSPARK_DRIVER_PYTHON_OPTS notebook

你也可以选择在Windows编辑环境变量的地方直接添加,效果是一样的。做完以上四句以后将 D:\Spark\spark-2.2.0-bin-hadoop2.7\bin 也加到环境变量中去。

1.2.7 运行PySpark

1
pyspark --master local[2]

在ipython notebook上双核运行pyspark。

1.2.8 Import pyspark

做完以上步骤你会发现已经可以在ipython的交互环境下使用pyspark,但是当import pyspark的时候会发现出现错误 ImportError: No module named ‘pyspark’。这个时候就涉及到如何将PySpark导入Python的问题。

解决方法:
打开文件夹 D:\Spark\spark-2.2.0-bin-hadoop2.7,看到里面有一个python的文件夹,我们要导入的lib文件就在这个文件夹里。进入D:\Spark\spark-2.2.0-bin-hadoop2.7\python\lib,里面有两个文件:

  • py4j-0.10.4-src
  • pyspark

我们将这个路径添加到我们Python的包的路径即可。

具体操作:打开C:\Python35\Lib\site-packages,在里面新建文件,后缀名为.pth,在里面加入D:\Spark\spark-2.2.0-bin-hadoop2.7\python\lib即可。

1.3 Example

下载Spark的目的是为了用它做Matrix Factorization,提升代码运行的速度和效率,所以这里用MF做例子。Spark使用ALS训练Matrix,读入用户物品评分矩阵并转化为dataframe,然后在上面做Matrix Factorization。代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Example").getOrCreate()

lines = spark.read.text("D:/GitCode/item_based_collaborative_filtering/test files/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

里面使用的数据集和源代码可以在我的Github上找到。


参考链接: