1. 机器学习数学基础
-
高等数学
-
线性代数
-
概率论和统计学
1.1. 线性代数
1.1.1. 什么是矩阵
-
矩阵(Matrix)是一个按照长方阵列排列的复数或实数集合
-
矩阵最早来自于方程组的系数及常数所构成的方阵,最初是用来解决线性方程求解的工具
-
矩阵是高等代数中的常见工具,也常见于统计分析等应用数学学科中;矩阵在物理学和计算机科学中都有应用
-
矩阵的运算是数值分析领域的重要问题
1.1.2. 矩阵
矩阵的定义
以上是\(m \times n\)矩阵的定义。
-
由\(m \times n\)个数\(a_{ij}, (i = 1,2,...,m; j = 1,2,...,n)\)排成的m行n列的数表 A就称为m行n列的矩阵。
-
这\(m \times n\)个数称作矩阵A的元素\(a_{ij}\),元素位于矩阵A的第i行第j列。
-
\(m \times n\)矩阵A可以记作\(A_{m \times n}\),其中m是行数,n是列数,m, n > 0。
特殊矩阵
对于\(A_{m \times n}\),如果m=n,即矩阵的行数与列数相等,那么称A为方阵。例如:
1.1.3. 矩阵中的概念
-
行数与列数都等于n的矩阵称为n阶矩阵,又称做n阶方阵,可以记作\(A_n\)。
-
只有一行的矩阵\(A_{1 \times n}\)称为行矩阵,又叫行向量。
-
同样,只有一列的矩阵\(A_{n \times 1}\)称为列矩阵,又叫列向量。
-
对于方阵,从左上角到右下角的直线,叫做主对角线,主对角线上的元素称为主对角线元素。
特殊矩阵
-
矩阵的元素全部为0,称为零矩阵,用O表示。
-
对于方阵,如果只有对角线元素为1,其余元素都为0,那么称为单位矩阵,一般用I或者E表示。
-
对于方阵,不在对角线上的元素都为0,称为对角矩阵。
1.1.4. 矩阵的加法
-
把矩阵的对应位元素相加。
-
矩阵的形状必须一致,即必须是同型矩阵。
1.1.5. 矩阵的乘法
数与矩阵相乘
数值与矩阵每一个元素相乘。
矩阵与矩阵相乘
-
左矩阵的每一行与右矩阵的每一列,对应每一个元素相乘。
-
\(A \times B\),那么有A矩阵\(m \times n\),B 矩阵\(n \times k\),要求左侧矩阵的列数n,必须等于右侧矩阵的行数n,结果矩阵C为\(m \times k\)矩阵。
定义:
练习:
1.1.6. 矩阵的转置
-
把矩阵A的行换成相同序数的列,得到一个新矩阵,叫做A的转置矩阵,记作\(A^T\)
-
行变列,列变行
-
A 为\(m \times n\)矩阵,转置之后为\(n \times m\)矩阵
1.1.7. 矩阵的运算方法
加法
-
\(A + B = B + A\)
-
\((A + B) + C = A + (B + C)\)
乘法
-
\((λμ)A = λ(μA)\)
-
\((λ + μ)A = λA + μA\)
-
\(λ(A + B) = λA + λB\)
-
\((AB) C = A(BC)\)
-
\(λ(AB) = (λA)B = A(λB)\)
-
\(A(B + C) = AB + AC\)
-
\((B + C)A = BA + CA\)
减法
-
\(A - B = A + B × (-1)\)
-
\(A - A = A + (-A) = O\)
转置
-
\((A^T)^T = A\)
-
\((A + B)^T = A^T + B^T\)
-
\((λA)^T = λA^T\)
-
\((AB)^T = B^T A^T\)
1.1.8. 矩阵的逆
-
对于n阶方阵A,如果有一个n阶方阵B,使得\(AB = BA = E\),就称矩阵A是可逆的,并把B称为A的逆矩阵。
-
A的逆矩阵记作\(A^{-1}\),如果\(AB = BA = E\),则\(B = A-1\)。
1.2. 微积分基础知识
-
什么是导数
-
偏导数
-
方向导数和梯度
-
凸函数和凹函数
1.2.1. 什么是导数
-
导数反映的是函数y = f(x)在某一点处沿x轴正方向的变化率
-
在x轴上某一点处,如果f'(x)>0,说明f(x)的函数值在x点沿x轴正方向是趋于增加的;如果f'(x)<0,说明f(x)的函数值在x点沿x轴正方向是趋于减少的
1.2.2. 偏导数
-
导数与偏导数本质是一致的,都是当自变量的变化量趋于0时,函数值的变化量与自变量变化量比值的极限
-
偏导数也就是函数在某一点上沿某个坐标轴正方向的的变化率
-
导数指的是一元函数中,函数y=f(x)在某一点处沿x轴正方向的变化率;而偏导数,指的是多元函数中,函数\(y=f(x_1,x_2,…,x_n)\)在某一点处沿某一坐标轴\((x_1,x_2,…,x_n)\)正方向的变化率
1.2.3. 方向导数
-
函数某一点在某一趋近方向(向量方向)上的导数值
-
方向导数就是函数在除坐标轴正方向外,其他特定方向上的变化率
1.2.4. 梯度(Gradient)
问题:函数在变量空间的某一点处,沿着哪一个方向有最大的变化率?
答案:梯度。
-
定义:函数在某一点的梯度是这样一个向量,它的方向与取得最大方向导数的方向一致,而它的模为方向导数的最大值
-
梯度是一个向量,即有方向、有大小;
-
梯度的方向是最大方向导数的方向;梯度的值是最大方向导数的值
1.2.5. 凸函数和凹函数
凸函数
凸函数是具有如下特性的一个定义在某个向量空间的凸子集C(区间)上的实值函数f:对其定义域C上的任意两点\(x_{1},x_{2}\),总有\(f(\frac{x_1+x_2}{2}) \le \frac{f(x_1)+f(x_2)}{2}\)。
凹函数
我们称一个有实值函数f在某区间(或者某个向量空间中的凹集)上是凹的,如果对任意该区间内不相等的x和y和[0,1]中的任意t有
某函数f:R→R,在x和y之间的每一点z,在图中的点(z, f(z))是在以点(x, f(x)) and (y, f(y))连成的直线之上。
1.3. 概率论和统计学
-
常用统计变量
-
常见概率分布
-
重要概率公式
1.3.1. 常用统计变量
-
样本均值
-
样本方差
-
样本标准差
1.3.2. 常见概率分布
-
均匀分布
-
正态分布(高斯分布)
-
指数分布
1.3.3. 重要概率公式
-
条件概率公式
-
全概率公式
-
贝叶斯公式
2. 数学基础(精简进阶版)
2.1. 线性代数
下面分别概括了向量、矩阵、运算、范数、特征向量和特征值的概念。
2.1.1. 向量
本书中的向量指的是列向量。一个\(n\)维向量\(\boldsymbol{x}\)的表达式可写成
其中\(x_1, \ldots, x_n\)是向量的元素。我们将各元素均为实数的\(n\)维向量\(\boldsymbol{x}\)记作\(\boldsymbol{x} \in \mathbb{R}^{n}\)或\(\boldsymbol{x} \in \mathbb{R}^{n \times 1}\)。
2.1.2. 矩阵
一个\(m\)行\(n\)列矩阵的表达式可写成
其中\(x_{ij}\)是矩阵\(\boldsymbol{X}\)中第\(i\)行第\(j\)列的元素(\(1 \leq i \leq m, 1 \leq j \leq n\))。我们将各元素均为实数的\(m\)行\(n\)列矩阵\(\boldsymbol{X}\)记作\(\boldsymbol{X} \in \mathbb{R}^{m \times n}\)。不难发现,向量是特殊的矩阵。
2.1.3. 运算
设\(n\)维向量\(\boldsymbol{A}\)中的元素为\(a_1, \ldots, a_n\),\(n\)维向量\(\boldsymbol{b}\)中的元素为\(b_1, \ldots, b_n\)。向量\(\boldsymbol{A}\)与\(\boldsymbol{b}\)的点乘(内积)是一个标量:
设两个\(m\)行\(n\)列矩阵
矩阵\(\boldsymbol{A}\)的转置是一个\(n\)行\(m\)列矩阵,它的每一行其实是原矩阵的每一列:
两个相同形状的矩阵的加法是将两个矩阵按元素做加法:
我们使用符号\(\odot\)表示两个矩阵按元素做乘法的运算:
定义一个标量\(k\)。标量与矩阵的乘法也是按元素做乘法的运算:
其他诸如标量与矩阵按元素相加、相除等运算与上式中的相乘运算类似。矩阵按元素开根号、取对数等运算也就是对矩阵每个元素开根号、取对数等,并得到和原矩阵形状相同的矩阵。
矩阵乘法和按元素的乘法不同。设\(\boldsymbol{A}\)为\(m\)行\(p\)列的矩阵,\(\boldsymbol{B}\)为\(p\)行\(n\)列的矩阵。两个矩阵相乘的结果
是一个\(m\)行\(n\)列的矩阵,其中第\(i\)行第\(j\)列(\(1 \leq i \leq m, 1 \leq j \leq n\))的元素为
2.1.4. 范数
设\(n\)维向量\(\boldsymbol{x}\)中的元素为\(x_1, \ldots, x_n\)。向量\(\boldsymbol{x}\)的\(L_p\)范数为
例如,\(\boldsymbol{x}\)的\(L_1\)范数是该向量元素绝对值之和:
而\(\boldsymbol{x}\)的\(L_2\)范数是该向量元素平方和的平方根:
我们通常用\(\|\boldsymbol{x}\|\)指代\(\|\boldsymbol{x}\|_2\)。
设\(\boldsymbol{x}\)是一个\(m\)行\(n\)列矩阵。矩阵\(\boldsymbol{x}\)的Frobenius范数为该矩阵元素平方和的平方根:
其中\(x_{ij}\)为矩阵\(\boldsymbol{x}\)在第\(i\)行第\(j\)列的元素。
2.1.5. 特征向量和特征值
对于一个\(n\)行\(n\)列的矩阵\(\boldsymbol{A}\),假设有标量\(\lambda\)和非零的\(n\)维向量\(\boldsymbol{v}\)使
那么\(\boldsymbol{v}\)是矩阵\(\boldsymbol{A}\)的一个特征向量,标量\(\lambda\)是\(\boldsymbol{v}\)对应的特征值。
2.2. 微分
我们在这里简要介绍微分的一些基本概念和演算。
2.2.1. 导数和微分
假设函数\(f: \mathbb{R} \rightarrow \mathbb{R}\)的输入和输出都是标量。函数\(f\)的导数
且假定该极限存在。给定\(y = f(x)\),其中\(x\)和\(y\)分别是函数\(f\)的自变量和因变量。以下有关导数和微分的表达式等价:
其中符号\(\text{D}\)和\(\text{d}/\text{d}x\)也叫微分运算符。常见的微分演算有\(\text{D}C = 0\)(\(C\)为常数)、\(\text{D}x^n = nx^{n-1}\)(\(n\)为常数)、\(\text{D}e^x = e^x\)、\(\text{D}\ln(x) = 1/x\)等。
如果函数\(f\)和\(g\)都可导,设\(C\)为常数,那么
如果\(y=f(u)\)和\(u=g(x)\)都是可导函数,依据链式法则,
2.2.2. 泰勒展开
函数\(f\)的泰勒展开式是
其中\(f^{(n)}\)为函数\(f\)的\(n\)阶导数(求\(n\)次导数),\(n!\)为\(n\)的阶乘。假设\(\epsilon\)是一个足够小的数,如果将上式中\(x\)和\(a\)分别替换成\(x+\epsilon\)和\(x\),可以得到
由于\(\epsilon\)足够小,上式也可以简化成
2.2.3. 偏导数
设\(u\)为一个有\(n\)个自变量的函数,\(u = f(x_1, x_2, \ldots, x_n)\),它有关第\(i\)个变量\(x_i\)的偏导数为
以下有关偏导数的表达式等价:
为了计算\(\partial u/\partial x_i\),只需将\(x_1, \ldots, x_{i-1}, x_{i+1}, \ldots, x_n\)视为常数并求\(u\)有关\(x_i\)的导数。
2.2.4. 梯度
假设函数\(f: \mathbb{R}^n \rightarrow \mathbb{R}\)的输入是一个\(n\)维向量\(\boldsymbol{x} = [x_1, x_2, \ldots, x_n ]^\top\),输出是标量。函数\(f(\boldsymbol{x})\)有关\(\boldsymbol{x}\)的梯度是一个由\(n\)个偏导数组成的向量:
为表示简洁,我们有时用\(\nabla f(\boldsymbol{x})\)代替\(\nabla_{\boldsymbol{x}} f(\boldsymbol{x})\)。
假设\(\boldsymbol{x}\)是一个向量,常见的梯度演算包括
类似地,假设\(\boldsymbol{x}\)是一个矩阵,那么
2.2.5. 海森矩阵
假设函数\(f: \mathbb{R}^n \rightarrow \mathbb{R}\)的输入是一个\(n\)维向量\(\boldsymbol{x} = [x_1, x_2, \ldots, x_n ]^\top\),输出是标量。假定函数\(f\)所有的二阶偏导数都存在,\(f\)的海森矩阵\(\boldsymbol{H}\)是一个\(n\)行\(n\)列的矩阵:
其中二阶偏导数
2.3. 概率
最后,我们简要介绍条件概率、期望和均匀分布。
2.3.1. 条件概率
假设事件\(A\)和事件\(B\)的概率分别为\(P(A)\)和\(P(B)\),两个事件同时发生的概率记作\(P(A \cap B)\)或\(P(A, B)\)。给定事件\(B\),事件\(A\)的条件概率
也就是说,
当满足
时,事件\(A\)和事件\(B\)相互独立。
2.3.2. 期望
离散的随机变量\(X\)的期望(或平均值)为
2.3.3. 均匀分布
假设随机变量\(X\)服从\([a, b\)]上的均匀分布,即\(X \sim U(a, b)\)。随机变量\(X\)取\(a\)和\(b\)之间任意一个数的概率相等。
3. 机器学习简介
-
机器学习的概念
-
机器学习主要分类
-
监督学习三要素
-
监督学习模型评估策略
-
监督学习模型求解算法
3.1. 机器学习的概念
-
机器学习是什么
-
机器学习的开端
-
机器学习的定义
-
机器学习的过程
-
机器学习示例
3.1.1. 机器学习是什么
-
什么是学习
-
从人的学习说起
-
学习理论;从实践经验中总结
-
在理论上推导;在实践中检验
-
通过各种手段获取知识或技能的过程
-
-
机器怎么学习?
-
处理某个特定的任务,以大量的“经验”为基础
-
对任务完成的好坏,给予一定的评判标准
-
通过分析经验数据,任务完成得更好了
-
3.1.2. 机器学习的开端
-
1952年,IBM的Arthur Samuel(被誉为“机器学习之父”)设计了一款可以学习的西洋跳棋程序。
-
它能通过观察棋子的走位来构建新的模型,并用其提高自己的下棋技巧。
-
Samuel和这个程序进行多场对弈后发现,随着时间的推移,程序的棋艺变得越来越好。
Samuel九十岁去世,八十八岁还在写代码。。。 |
3.1.3. 机器学习的定义
-
机器学习(Machine Learning, ML)主要研究计算机系统对于特定任务的性能,逐步进行改善的算法和统计模型。
-
通过输入海量训练数据对模型进行训练,使模型掌握数据所蕴含的潜在规律,进而对新输入的数据进行准确的分类或预测。
-
是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸优化、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能。
3.1.4. 机器学习的过程
海量数据 → 提炼规律 → 预测未来
3.2. 机器学习的分类
-
监督学习:提供数据并提供数据对应结果的机器学习过程。
-
无监督学习:提供数据并且不提供数据对应结果的机器学习过程。
-
强化学习:通过与环境交互并获取延迟返回进而改进行为的学习过程。
3.2.1. 监督学习
-
监督学习(Supervised Learning)算法构建了包含输入和所需输出的一组数据的数学模型。这些数据称为训练数据,由一组训练样本组成。
-
监督学习主要包括分类和回归。
-
当输出被限制为有限的一组值(离散数值)时使用分类算法;当输出可以具有范围内的任何数值(连续数值)时使用回归算法。
-
相似度学习是和回归和分类都密切相关的一类监督机器学习,它的目标是使用相似性函数从样本中学习,这个函数可以度量两个对象之间的相似度或关联度。它在排名、推荐系统、视觉识别跟踪、人脸识别等方面有很好的应用场景。
例子
所在街区 | 房屋价格 | 住房面积 | 住房格局 | 是否学区 | 是否售出 |
---|---|---|---|---|---|
海淀 |
7000000 |
120 |
三室一厅 |
是 |
是 |
朝阳 |
6000000 |
100 |
二室一厅 |
否 |
否 |
昌平 |
5000000 |
120 |
二室一厅 |
否 |
是 |
大兴 |
6500000 |
150 |
三室一厅 |
否 |
? |
3.2.2. 监督学习深入介绍
-
监督学习三要素
-
监督学习实现步骤
-
监督学习模型评估策略
-
分类和回归
-
监督学习模型求解算法
监督学习三要素
-
模型(model):总结数据的内在规律,用数学函数描述的系统
-
策略(strategy):选取最优模型的评价准则
-
算法(algorithm):选取最优模型的具体方法
监督学习实现步骤
-
得到一个有限的训练数据集
-
确定包含所有学习模型的集合
-
确定模型选择的准则,也就是学习策略
-
实现求解最优模型的算法,也就是学习算法
-
通过学习算法选择最优模型
-
利用得到的最优模型,对新数据进行预测或分析
监督学习过程示例
3.2.3. 模型评估策略
-
模型评估
-
训练集和测试集
-
损失函数和经验风险
-
训练误差和测试误差
-
-
模型选择
-
过拟合和欠拟合
-
正则化和交叉验证
-
训练集和测试集
-
我们将数据输入到模型中训练出了对应模型,但是模型的效果好不好呢?我们需要对模型的好坏进行评估
-
我们将用来训练模型的数据称为训练集,将用来测试模型好坏的集合称为测试集。
-
训练集:输入到模型中对模型进行训练的数据集合。
-
测试集:模型训练完成后测试训练效果的数据集合。
损失函数
-
损失函数用来衡量模型预测误差的大小。
-
定义:选取模型f为决策函数,对于给定的输入参数X,f(X)为预测结果,Y为真实结果;f(X)和Y之间可能会有偏差,我们就用一个损失函数(loss function)来度量预测偏差的程度,记作L(Y,f(X))
-
损失函数是系数的函数
-
损失函数值越小,模型就越好
常见损失函数
-
0-1损失函数
-
平方损失函数
-
绝对损失函数
-
对数损失函数
经验风险
-
经验风险
-
模型f(X)关于训练数据集的平均损失称为经验风险(empirial risk),记作\(R_{emp}\)
-
-
经验风险最小化(Empirical Risk Minimization,ERM)
-
这一策略认为,经验风险最小的模型就是最优的模型
-
样本足够大时,ERM有很好的学习效果,因为有足够多的“经验”
-
样本较小时,ERM就会出现一些问题
-
训练误差和测试误差
-
训练误差
-
训练误差(training error)是关于训练集的平均损失。
-
-
训练误差的大小,可以用来判断给定问题是否容易学习,但本质上并不重要
-
测试误差
-
-
测试误差(testing error)是关于测试集的平均损失。
-
测试误差真正反映了模型对未知数据的预测能力,这种能力一般被称为泛化能力。
过拟合和欠拟合
欠拟合
-
模型没有很好地捕捉到数据特征,特征集过小,导致模型不能很好地拟合数据,称之为欠拟合(under-fitting)
-
欠拟合的本质是对数据的特征“学习”得不够
-
例如,想分辨一只猫,只给出了四条腿、两只眼、有尾巴这三个特征,那么由此训练出来的模型根本无法分辨猫
过拟合
-
把训练数据学习的太彻底,以至于把噪声数据的特征也学习到了,特征集过大,这样就会导致在后期测试的时候不能够很好地识别数据,即不能正确的分类,模型泛化能力太差,称之为过拟合(over-fitting)。
-
例如,想分辨一只猫,给出了四条腿、两只眼、一条尾巴、叫声、颜色,能够捕捉老鼠、喜欢吃鱼、…,然后恰好所有的训练数据的猫都是白色,那么这个白色是一个噪声数据,会干扰判断,结果模型把颜色是白色也学习到了,而白色是局部样本的特征,不是全局特征,就造成了输入一个黑猫的数据,判断出不是猫。
例子
以上可以看到,当\(M=0,M=1\)时,是明显的欠拟合状态,有些数据没有拟合到。而当\(M=3\)时,虽然没有和样本点完全吻合,却是拟合的最好的一条曲线。而当\(M=9\)时,曲线严格吻合样本点,但却是严重的过拟合,为什么?因为在拟合或者说机器学习的过程中,我们把噪声也拟合进去了,所以造成了过拟合。
3.2.4. 模型的选择
-
当模型复杂度增大时,训练误差会逐渐减小并趋向于0;而测试误差会先减小,达到最小值之后再增大
-
当模型复杂度过大时,就会发生过拟合;所以模型复杂度应适当
3.2.5. 正则化
-
结构风险最小化(Structural Risk Minimization,SRM)
-
是在ERM基础上,为了防止过拟合而提出来的策略
-
在经验风险上加上表示模型复杂度的正则化项(regularizer),或者叫惩罚项
-
正则化项一般是模型复杂度的单调递增函数,即模型越复杂,正则化值越大
-
-
结构风险最小化的典型实现是正则化(regularization)
-
形式:
-
-
第一项是经验风险,第二项\(J(f)\)是正则化项,\(\lambda \geq 0\)是调整两者关系的系数
-
正则化项可以取不同的形式,比如,特征向量的\(L_1\)范数或\(L_2\)范数
例如:
的正则化项可以设置为
这样就可以很好的惩罚过拟合,因为一旦过拟合,系数将会非常的大,而这就造成了结构风险也会很大。
奥卡姆剃刀
-
奥卡姆剃刀(Occam‘s razor)原理:如无必要,勿增实体
-
正则化符合奥卡姆剃刀原理。它的思想是:在所有可能选择的模型中,我们应该选择能够很好地解释已知数据并且十分简单的模型
-
如果简单的模型已经够用,我们不应该一味地追求更小的训练误差,而把模型变得越来越复杂
3.2.6. 交叉验证
-
数据集划分
-
如果样本数据充足,一种简单方法是随机将数据集切成三部分:训练集(training set)、验证集(validation set)和测试集(test set)
-
训练集用于训练模型,验证集用于模型选择,测试集用于学习方法评估
-
-
数据不充足时,可以重复地利用数据——交叉验证(cross validation)
-
简单交叉验证
-
数据随机分为两部分,如70%作为训练集,剩下30%作为测试集
-
训练集在不同的条件下(比如参数个数)训练模型,得到不同的模型
-
在测试集上评价各个模型的测试误差,选出最优模型
-
-
S折交叉验证
-
将数据随机切分为S个互不相交、相同大小的子集;S-1个做训练集,剩下一个做测试集
-
重复进行训练集、测试集的选取,有S种可能的选择
-
-
留一交叉验证
-
3.2.7. 分类和回归
-
监督学习问题主要可以划分为两类,即分类问题和回归问题
-
分类问题预测数据属于哪一类别。 —— 离散
-
回归问题根据数据预测一个数值。 —— 连续
-
-
通俗地讲,分类问题就是预测数据属于哪一种类型,就像上面的房屋出售预测,通过大量数据训练模型,然后去预测某个给定房屋能不能出售出去,属于能够出售类型还是不能出售类型。
-
回归问题就是预测一个数值,比如给出房屋一些特征,预测房价
-
如果将上面的房屋出售的问题改为预测房屋出售的概率,得到的结果将不再是可以售出(1)和不能售出(0),将会是一个连续的数值,例如 0.5,这就变成了一个回归问题
分类问题
-
在监督学习中,当输出变量 Y 取有限个离散值时,预测问题就成了分类(classification)问题
-
监督学习从数据中学习一个分类模型或分类决策函数,称为分类器(classifier);分类器对新的输入进行预测,称为分类
-
分类问题包括学习和分类两个过程。学习过程中,根据已知的训练数据集利用学习方法学习一个分类器;分类过程中,利用已习得的分类器对新的输入实力进行分类
-
分类问题可以用很多学习方法来解决,比如k近邻、决策树、感知机、逻辑斯谛回归、支撑向量机、朴素贝叶斯法、神经网络等
精确率和召回率
-
评价分类器性能的指标一般是分类准确率(accuracy),它定义为分类器对测试集正确分类的样本数与总样本数之比
-
对于二类分类问题,常用的评价指标是精确率(precision)与召回率(recall)
-
通常以关注的类为正类,其它为负类,按照分类器在测试集上预测的正确与否,会有四种情况出现,它们的总数分别记作:
-
TP:将正类预测为正类的数目
-
FN:将正类预测为负类的数目
-
FP:将负类预测为正类的数目
-
TN:将负类预测为负类的数目
-
-
精确率
-
精确率指的是“所有预测为正类的数据中,预测正确的比例”
-
召回率
-
-
召回率指的是“所有实际为正类的数据中,被正确预测找出的比例”
回归问题
-
回归问题用于预测输入变量和输出变量之间的关系
-
回归模型就是表示从输入变量到输出变量之间映射的函数
-
回归问题的学习等价于函数拟合:选择一条函数曲线,使其很好地拟合已知数据,并且能够很好地预测未知数据
-
回归问题的分类
-
按照输入变量的个数:一元回归和多元回归
-
按照模型类型:线性回归和非线性回归
-
回归学习的损失函数 —— 平方损失函数
-
如果选取平方损失函数作为损失函数,回归问题可以用著名的最小二乘法(least squares)来求解
-
3.2.8. 模型求解算法(学习算法)
-
梯度下降算法
-
牛顿法和拟牛顿法
梯度下降算法
-
梯度下降(gradient descent)是一种常用的一阶优化方法,是求解无约束优化问题最简单、最经典的方法之一
-
梯度方向:函数变化增长最快的方向(变量沿此方向变化时函数增长最快)
-
负梯度方向:函数变化减少最快的方向(变量沿此方向变化时函数减少最快)
-
损失函数是系数的函数,那么如果系数沿着损失函数的负梯度方向变化,此时损失函数减少最快,能够以最快速度下降到极小值
-
沿着负梯度方向迭代,迭代后的\(\theta\)使损失函数\(J(\theta)\)更小:
-
比如我们在一座大山上的某处位置,由于我们不知道怎么下山,于是决定走一步算一步,也就是在每走到一个位置的时候,求解当前位置的梯度,沿着梯度的负方向,也就是当前最陡峭的位置向下走一步,然后继续求解当前位置梯度,向这一步所在位置沿着最陡峭最易下山的位置走一步。这样一步步的走下去,一直走到觉得我们已经到了山脚。当然这样走下去,有可能我们不能走到山脚,而是到了某一个局部的山谷处。
-
从上面的解释可以看出,梯度下降不一定能够找到全局的最优解,有可能是一个局部最优解
-
如果损失函数是凸函数,梯度下降法得到的解就一定是全局最优解
3.2.9. 牛顿法和拟牛顿法
牛顿法
迭代公式:
-
其中\(g_k=g(x^{(k)})=\nabla f(x^{(k)})\)是f(x) 的梯度向量在\(x^{(k)}\)的值,
-
\(H(x^{(k)})\)是f(x)的海塞矩阵在\(x^{(k)}\)的值
-
梯度下降法只考虑了一阶导数,而牛顿法考虑了二阶导数,因此收敛速度更快
拟牛顿法
-
牛顿法需要求解目标函数的海赛矩阵的逆矩阵,计算比较复杂
-
拟牛顿法通过正定矩阵近似海赛矩阵的逆矩阵,从而大大简化了计算过程
4. 机器学习模型简介
主要内容
监督学习
-
回归模型
-
线性回归
-
-
分类模型
-
感知机
-
K近邻(KNN)
-
决策树
-
逻辑斯蒂回归
-
无监督学习
-
聚类
-
K均值聚类
-
4.1. 使用Anaconda
4.2. 线性回归
-
线性回归(linear regression)是一种线性模型,它假设输入变量x和单个输出变量y之间存在线性关系
-
具体来说,利用线性回归模型,可以从一组输入变量x的线性组合中,计算输出变量y
-
一元线性回归(只有一个特征\(x\))
-
多元线性回归(有多个特征\(x_0,x_1,\dots,x_n\))
一元线性回归就是从上图中的一堆散点图,拟合出一条直线\(y=wx+b\)。这里的拟合其实就是机器学习。散点为训练数据,我们要从训练数据中学习出一条直线。 |
这里的模型就是线性回归模型。那么机器学习的算法呢?
-
最小二乘法
-
梯度下降法
注意我们这里还没有讲监督学习三要素中的策略呢! |
4.2.1. 损失函数
这里使用平方损失函数,并且没有正则化项。
4.2.2. 学习算法
求解\(w,b\)使得平方损失函数取得最小值。
-
最小二乘法
-
梯度下降法
4.2.3. 最小二乘法
-
基于均方误差最小化来进行模型求解的方法称为“最小二乘法”(least square method)。
-
它的主要思想就是选择未知参数,使得理论值与观测值之差的平方和达到最小。
-
我们假设输入属性(特征)的数目只有一个:就是\(f(x)=wx+b\)中的\(x\)。我们的训练数据是\((x_1,y_1),(x_2,y_2),\dots,(x_n,y_n)\)。要从这些训练数据中学习一个模型出来,使得:\(f(x_i)=wx_i+b, f(x_i) \approx y_i\)。
-
在线性回归中,最小二乘法就是试图找到一条直线,使所有样本到直线上的欧式距离之和最小。
以上就是监督学习三要素中的策略! |
最小二乘法求解过程
-
求解\(w\)和\(b\),使得\(E_{(w,b)}=\sum_{i=1}^m (wx_i+b-y_i)^2\)最小化的过程,称为线性回归模型的“最小二乘估计”。
-
将\(E_{(w,b)}\)分别对\(w\)和\(b\)进行求导,可以得到:
-
令偏导数都为0,可以得到:
将\(b\)代入到第一个式子中,可以解得\(w\)的值,然后再将\(w\)的值代入到b的表达式,即可解得\(b\)。
其中:\(\bar{x}=\frac{1}{m}\sum_{i=1}^m x_i\)
4.2.4. 梯度下降法求解过程
求解\(w\)和\(b\),使得\(E_{(w,b)}=\sum_{i=1}^m (wx_i+b-y_i)^2\)达到极小值,也可以使用梯度下降法。
梯度为:
这里不是令偏导数为零了,而是准备梯度下降。 |
以上,\(w,b\)的初始值,\(\alpha\)的值,以及迭代的次数,都是超参数。因为不是机器学习学习出来的参数。 |
-
\(\alpha\)在梯度下降算法中被称作为学习率或者步长
-
这意味着我们可以通过\(\alpha\)来控制每一步走的距离,以保证不要走太快,错过了最低点;同时也要保证收敛速度不要太慢
-
所以\(\alpha\)的选择在梯度下降法中往往是很重要的,不能太大也不能太小
4.2.5. 梯度下降法和最小二乘法
-
相同点
-
本质和目标相同:两种方法都是经典的学习算法,在给定已知数据的前提下利用求导算出一个模型(函数),使得损失函数最小,然后对给定的新数据进行估算预测
-
-
不同点
-
损失函数:梯度下降可以选取其它损失函数,而最小二乘一定是平方损失函数
-
实现方法:最小二乘法是直接求导找出全局最小;而梯度下降是一种迭代法
-
效果:最小二乘找到的一定是全局最小,但计算繁琐,且复杂情况下未必有解;梯度下降迭代计算简单,但找到的一般是局部最小,只有在目标函数是凸函数时才是全局最小;到最小点附近时收敛速度会变慢,且对初始点的选择极为敏感
-
4.3. 感知机(Perceptron)
4.3.1. 感知机的定义
感知机是机器学习应用中分类的最简单的一种算法。如下图所示:感知机划分超平面
感知机是二分类的线性模型,输入是实例的特征向量,输出是实例的类别;假设训练的数据集是线性可分的,感知机的目标就是求一个能够将训练集的正负样本完全正确分离开的超平面(也就是上图中所示的那些将蓝、黄数据点完全分离开的直线)。但是如果这些数据是非线性可分的,这个超平面是无法获取的。上图的坐标轴,横坐标为\(X_1\),纵坐标为\(X_2\)。图中的每一个点都由\((X_1,X_2)\)所决定。举个实例:有一批零件,判断零件是否合格有两个重要点,长度和重量。\(X_1\)表示长度,\(X_2\)表示重量,上图的两条黑线表示零件的长度均值和重量均值。只有当长度和重量都满足一定条件,该零件才为合格品。都不满足一定条件,视为不可修复的劣质品,直接丢弃。那么机器学习如何学习到这个规则呢?我们在代码实现的时候,拿到手的是所有样本的信息\((X_1,X_2)\)和标签(0或1),标签里面0表示不合格品,1表示合格品。简单的说就是图片上黄色和蓝色的点。根据我们手上的这些点,我们需要找到一条直线将上面的点完美的分开。这样的直线我们可以找到很多条,那么哪一条才是最好的呢?实际上,感知机只是一个二分类的问题,无法找到一条最佳的直线,只需要能把所有的点都分开就好。我们设定损失函数为所有分错的点和直线的距离求和,然后训练,使这段求和的数值最小(最优的情况是0,因为0代表完全分开了),那么这条直线就满足我们的条件,就是我们所找的。
4.3.2. 感知机的数学原理
首先,点\(P(x_0,y_0)\)到直线\(Ax+By+C=0\)的距离为:
类似的:设超平面公式为:\(h=wx+b\),其中\(w=(w_0,w_1,w_2,\dots,w_n),x=(x_0,x_1,x_2,\dots,x_n)\)。其中样本点\(x'\)到超平面的距离为:
那么这个超平面为什么设置为\(wx+b\)呢?它和我们常见的\(ax+b\)有什么区别呢?
本质没啥区别,\(ax+b\)是二维中的,\(wx+b\)是高维中的。就看你的理解啦,简单的来说,\(wx+b\)是一个\(n\)维空间中的超平面\(S\),其中\(w\)是超平面的法向量,\(b\)是超平面的截距,这个超平面将特征空间划分成两部分,位于两部分的点分别被分为正负两类,所以,超平面S称为分离超平面。其中\(w=(w_0,w_1,w_2,\dots,w_n),x=(x_0,x_1,x_2,\dots,x_n)\)。
细节:
\(w\)是超平面的法向量:对于一个平面来说\(w\)就是这么定义的。数学上就这么定义的。
\(b\)是超平面的截距:可以按照二维中的\(ax+b\)理解。
特征空间:也就是整个\(n\)维空间,样本的每个属性都叫一个特征,特征空间的意思就是在这个空间中可以找到样本所有的属性组合。
4.3.3. 感知机的模型
感知机的模型:输入空间—>输出空间:
sign函数很简单,当x大于等于0,sign输出1,否则输出-1。那么往前想一下,\(wx+b\)如果大于等于0,\(f(x)\)就等于1,反之\(f(x)\)等于-1。
4.3.4. 感知机的损失函数
我们定义样本\((x_i,y_i)\),如果上面的距离\(d > 0\),则\(y_i=1\);如果\(d < 0\),则\(y_i=-1\),这样取\(y\)有一个好处,就是方便定义损失函数。优化的目标:期望使误分类的所有样本,到超平面的距离之和最小。
所以定义损失函数为:
其中M集合就是误分类点的集合。
不考虑前面的系数,感知机模型的损失函数为:
4.3.5. 感知机学习算法
感知机学习算法是对于上述损失函数进行极小化,求得\(w\)和\(b\)。这里使用随机梯度下降法(SGD),因为误分类的M集合里面的样本才能参加损失函数的优化。
目标函数如下:
算法步骤
输入:训练数据集\(T=(x_1,y_1),(x_2,y_2),\dots,(x_N,y_N),y_i \in \{-1,+1\},学习率 \eta (0 < \eta < 1)\)
输出:\(w,b\);感知机模型\(f(x)=sign(wx+b)\)
-
赋初值\(w_0,b_0\)
-
选取数据点\((x_i,y_i)\)
-
判断该数据点是否为当前模型的误分类点,即判断若\(y_i(wx_i+b) \le 0\)则更新:
-
转到2,直到训练集中没有误分类点。
4.4. 逻辑回归(Logistic Regression)
4.4.1. 直观理解
逻辑回归虽然叫回归,但其实是分类算法。 |
我们还记不记得感知机一节中给出的这张图?还记得感知机是怎么工作的吗?
感知机生成一个超平面(在二维中表现为一条直线),这个超平面可以将图中的两类点区分开。像下面这张图,图中的所有直线都可以作为感知机的超平面(因为它们都能把训练集中的点完美地分开)。
但是感知机存在一个很重要的问题,我们只用\(sign(wx+b)\)输出的\(+1\)和\(-1\)来判断点的类别是不是太简单了?是不是有点生硬的感觉。
这么简单的判别方式真的会很有效吗?
当然了,虽然我们已经程序测试过正确率很高,但总是让人有点担心是否在很多情况下都能很好地工作。事实上我们从小到大一直会听到一些升学考试差一分两分的例子,那么差一分和高一分的学生真的就是天壤之别吗?感知机也是如此:在超平面左侧距离原点0.001的点和右侧距离原点0.001的点输出就是+1和-1这天壤之别的差距真的合适吗?
此外我们也知道机器学习中通常会对目标函数进行微分(求导)进而梯度下降,但我们看上面这张图。很明显\(x=0\)是跳跃间断点,因此\(sign\)是一条不光滑的函数,没有办法进行微分。咦?那我记得感知器用了梯度下降,它是怎么去梯度下降的?
我们回忆一下感知器的梯度下降方式,确实用了梯度下降,但是发现没有,它把sign的壳子去掉了,对sign内部进行梯度下降,相对于其他能直接微分的算法来说,感知器的这种方式确实有点不太好。
感知机算法:
输入:训练数据集\(T=(x_1,y_1),(x_2,y_2),\dots,(x_N,y_N),y_i \in \{-1,+1\},学习率 \eta (0 < \eta < 1)\)
输出:\(w,b\);感知机模型\(f(x)=sign(wx+b)\)
-
赋初值\(w_0,b_0\)
-
选取数据点\((x_i,y_i)\)
-
判断该数据点是否为当前模型的误分类点,即判断若\(y_i(wx_i+b) \le 0\)则更新:
-
转到2,直到训练集中没有误分类点。
因此既然我们要对感知器进行优化,那么上文的这两个主要的缺陷咱们得想想法子看看能不能弥补。我们首先试试解决第一个问题:
-
怎么解决极小距离差别带来的+1和-1的天壤之别?
在逻辑斯蒂回归中大致思想与感知器相同,但在输出\(+1\)与\(-1\)之间存在一些差别。我们知道\(P(Y|X)\),它表示在给定\(X\)条件下,\(Y\)发生的概率。逻辑回归也使用了同样的概念,它使用\(p(Y=-1|X)\)和\(p(Y=1|X)\)来表示该样本分别为\(-1\)或\(1\)的概率(实际上逻辑回归并非强制要求标签必须为1或-1,可以用任意标签来表示)。这样当再出现样本\(X_1\)距离为\(-0.001\)时,可能\(P(Y=1|X_1)=0.49,P(Y=0|X_1)=0.51\),那么我们觉得\(X_1\)为0的概率更大一点,但我们同时也清楚程序可能并不太确定\(X_1\)一定为\(0\)。
使用概率作为输出结果使得样本在距离很小的差别下不再强制地输出\(+1\)和\(-1\)这两种天壤之别的结果,而是通过概率的方式告诉你结果可能是多少,同时也告诉你预测的不确信程度。这样子看起来让人比较安心一点不是吗?
-
怎么让最终的预测式子可微呢?
虽然无法微分并不会阻碍感知器的正常工作(事实上只是避开了sign),但对于很多场合都需要微分的机器学习来说,能找到一个可以微分的最终式子是很重要的。为了解决第一个问题我们提出了一种概率输出模型,那么感知器的\(sign\)也需要被随之替换为一种能输出概率大小的函数。具体函数在下文中详细讲解,其中值得高兴的是,我们找到的概率输出式子是平滑的,可微的,所以第二个问题也就迎刃而解了。
4.4.2. 逻辑回归模型
与感知器一样,我们先不管内部怎么工作,最好能够得到一个函数\(f(x)\),我们将样本\(x\)放进去以后,它告诉我们属于\(1\)的概率是多少。比如说\(f(y=1|x)=0.2,f(y=0|x)=0.8\),我们就知道该样本的标签大概率是\(0\)。总结一下也就是分别比较两种类别的概率大小,概率大的那一方则作为预测类别进行输出。\(f\)函数的定义如下所示。
二项逻辑回归模型是如下的条件概率分布:
这里,\(x \in R^n\)是输入,\(Y \in \{ 0,1 \}\)是输出,\(w \in R^n\)和\(b \in R\)是参数,\(w\)称为权值向量(weight),\(b\)称为偏置(bias),\(w \cdot x\)为\(w\)和\(x\)的内积。
下图就是\(f(w \cdot x+b)\)的图像,我们假设一下点\(X\)在超平面右边,那么距离应当为正,如果距离无穷大时,\(\exp{(w \cdot x + b)}\)无穷大,\(P(Y=1|x)=1\),也就是概率为\(1\),表示极其确信。如果\(w \cdot x+b\)是一个很接近\(0\)的数,\(\exp(w \cdot x + b)\)接近\(1\),\(P(Y=1|x)=0.5\),表示两边都有可能,不太确定。我们对于每一个样本点分别计算属于两类的概率,概率较大的一方作为预测的输出。
其实这里的函数\(\phi (z)=\frac{1}{1+e^{-z}}\),有一个很有名的名字叫做Sigmoid Function,或者叫压缩函数,在神经网络里面经常用到。 |
4.4.3. 损失函数
好了,所要用的几个函数我们都有了,接下来要做的就是根据给定的训练集,把参数\(w\)给求出来了。要找参数\(w\),首先就是得把损失函数(cost function)给定义出来,也就是目标函数。
我们第一个想到的自然是模仿线性回归的做法,利用误差平方和来当代价函数。
其中\((x_i,y_i)\)为样本点。\(y_i\)为真实值,\(\phi (w \cdot x_i + b)\)为预测值。
我们发现,逻辑回归的平方损失函数是一个非凸函数,这就意味着损失函数有着许多的局部极小值,这不利于我们求解最小值。
所以我们这里使用对数损失函数。
-
对数损失函数
也就是说
也就是说
当样本值\(y=1\),而预测值\(\phi (wx+b) = \frac{\exp{(wx+b)}}{1+\exp{(wx+b)}}\)接近于0时,也就是说完全预测错误,那么惩罚将是很大的,是无穷大。\(y=0\)时同理。
我们可以得出
4.4.4. 梯度下降
Sigmoid Function有一个很好的性质: |
求偏导数的过程
4.5. KNN
-
最简单最初级的分类器,就是将全部的训练数据所对应的类别都记录下来,当测试对象的属性和某个训练对象的属性完全匹配时,便可以对其进行分类
-
K近邻(k-nearest neighbour, KNN)是一种基本分类方法,通过测量不同特征值之间的距离进行分类。它的思路是:如果一个样本在特征空间中的k个最相似(即特征空间中最邻近)的样本中的大多数属于某一个类别,则该样本也属于这个类别,其中K通常是不大于20的整数
-
KNN算法中,所选择的邻居都是已经正确分类的对象
4.5.1. KNN例子
-
绿色圆要被决定赋予哪个类,是红色三角形还是蓝色四方形?
-
如果K=3,由于红色三角形所占比例为2/3,绿色圆将被赋予红色三角形那个类,如果K=5,由于蓝色四方形比例为3/5,因此绿色圆被赋予蓝色四方形类
-
KNN算法的结果很大程度取决于K的选择
4.5.2. KNN距离计算
KNN中,通过计算对象间距离来作为各个对象之间的非相似性指标,避免了对象之间的匹配问题,在这里距离一般使用欧氏距离或曼哈顿距离:
-
欧式距离
-
曼哈顿距离
4.5.3. KNN算法
在训练集中数据和标签已知的情况下,输入测试数据,将测试数据的特征与训练集中对应的特征进行相互比较,找到训练集中与之最为相似的前K个数据,则该测试数据对应的类别就是K个数据中出现次数最多的那个分类,其算法的描述为:
-
计算测试数据与各个训练数据之间的距离;
-
按照距离的递增关系进行排序;
-
选取距离最小的K个点;
-
确定前K个点所在类别的出现频率;
-
返回前K个点中出现频率最高的类别作为测试数据的预测分类。
4.6. 决策树
5. 推荐系统项目实战
5.1. 项目体系架构设计
5.1.1. 项目系统架构
项目以推荐系统建设领域知名的经过修改过的中文亚马逊电商数据集作为依托,以某电商网站真实业务数据架构为基础,构建了符合教学体系的一体化的电商推荐系统,包含了离线推荐与实时推荐体系,综合利用了协同过滤算法以及基于内容的推荐方法来提供混合推荐。提供了从前端应用、后台服务、算法设计实现、平台部署等多方位的闭环的业务实现。
电商推荐系统架构图
架构图代码,使用mermaid渲染出架构图
graph TD
A[前端可视化, Angular] --> B[后端业务逻辑, Spring]
B --> C[业务数据库]
C --> D[持久化: MongoDB]
C --> E[缓存: Redis]
D --> F[离线推荐服务, Spark MLlib]
D --> G[离线统计服务, Spark SQL]
B --> H[日志采集服务, Flume]
H --> I[消息队列, Kafka]
I --> J[实时推荐, Spark Streaming]
E --> J
-
用户可视化:主要负责实现和用户的交互以及业务数据的展示,主体采用AngularJS2进行实现,部署在Apache服务上。
-
综合业务服务:主要实现JavaEE层面整体的业务逻辑,通过Spring进行构建,对接业务需求。部署在Tomcat上。
5.1.2. 数据存储部分
-
业务数据库:项目采用广泛应用的文档数据库MongoDB作为主数据库,主要负责平台业务逻辑数据的存储。
-
缓存数据库:项目采用Redis作为缓存数据库,主要用来支撑实时推荐系统部分对于数据的高速获取需求。
离线推荐部分
-
离线统计服务:批处理统计性业务采用Spark Core + Spark SQL进行实现,实现对指标类数据的统计任务。
-
离线推荐服务:离线推荐业务采用Spark Core + Spark MLlib进行实现,采用ALS算法进行实现。
-
基于内容的推荐:采用TF-IDF算法提取UGC标签的关键词,计算商品之间的余弦相似度。
-
基于物品的协同过滤推荐:实现传统的Item-CF算法。
实时推荐部分
-
日志采集服务:通过利用Flume-ng对业务平台中用户对于电影的一次评分行为进行采集,实时发送到Kafka集群。
-
消息缓冲服务:项目采用Kafka作为流式数据的缓存组件,接受来自Flume的数据采集请求。并将数据推送到项目的实时推荐系统部分。
-
实时推荐服务:项目采用Spark Streaming作为实时推荐系统,通过接收Kafka中缓存的数据,通过设计的推荐算法实现对实时推荐的数据处理,并将结构合并更新到MongoDB数据库。
5.2. 项目数据流程
电商推荐系统数据流图
略
5.2.1. 系统初始化部分
-
通过Spark SQL将系统初始化数据加载到MongoDB中。 【离线推荐部分】
-
离线统计服务从MongoDB中加载数据,将【商品平均评分统计】、【商品评分个数统计】、【最近商品评分个数统计】三个统计算法进行运行实现,并将计算结果回写到MongoDB中;离线推荐服务从MongoDB中加载数据,通过ALS算法分别将【用户推荐结果矩阵】、【商品相似度矩阵】回写到MongoDB中。
5.2.2. 实时推荐部分
-
Flume从综合业务服务的运行日志中读取日志更新,并将更新的日志实时推送到Kafka中;Kafka在收到这些日志之后,通过kafkaStream程序对获取的日志信息进行过滤处理,获取用户评分数据流【userId|productId|score|timestamp】,并发送到另外一个Kafka队列;Spark Streaming监听Kafka队列,实时获取Kafka过滤出来的用户评分数据流,融合存储在Redis中的用户最近评分队列数据,提交给实时推荐算法,完成对用户新的推荐结果计算;计算完成之后,将新的推荐结构和MongDB数据库中的推荐结果进行合并。
5.2.3. 业务系统部分
-
推荐结果展示部分,从MongoDB中将离线推荐结果、实时推荐结果、内容推荐结果进行混合,综合给出相对应的数据。
-
商品信息查询服务通过对接MongoDB实现对商品信息的查询操作。
-
商品评分部分,获取用户通过UI给出的评分动作,后台服务进行数据库记录后,一方面将数据推动到Redis群中,另一方面,通过预设的日志框架输出到Tomcat中的日志中。
-
商品标签部分,项目提供用户对电影打标签服务。
5.3. 数据模型
-
商品数据表
字段名 | 字段类型 | 字段描述 | 字段备注 |
---|---|---|---|
productId |
Int |
商品ID |
|
name |
String |
商品名字 |
|
categories |
String |
商品分类 |
|
imageUrl |
String |
图片url |
|
tags |
String |
UGC标签 |
-
评分数据表
字段名 | 字段类型 | 字段描述 | 字段备注 |
---|---|---|---|
userid |
Int |
用户ID |
|
productId |
Int |
商品ID |
|
score |
Double |
商品分值 |
|
timestamp |
Long |
评分时间 |
-
用户表
字段名 | 字段类型 | 字段描述 | 字段备注 |
---|---|---|---|
productId |
Int |
用户的ID |
|
username |
String |
用户名 |
|
password |
String |
用户密码 |
|
first |
boolean |
用于是否第一次登录 |
|
timestamp |
Long |
用户创建的时间 |
5.4. 工具环境搭建
我们的项目中用到了多种工具进行数据的存储、计算、采集和传输,本章主要简单介绍设计的工具环境搭建。
如果机器的配置不足,推荐只采用一台虚拟机进行配置,而非完全分布式,将该虚拟机CPU的内存设置的尽可能大,推荐为CPU > 4、MEM > 4GB。
5.4.1. MongoDB(单节点)环境配置
// 通过WGET下载Linux版本的MongoDB
[bigdata@linux ~]$ wget https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-rhel62-3.4.3.tgz
// 将压缩包解压到指定目录
[bigdata@linux backup]$ tar -xf mongodb-linux-x86_64-rhel62-3.4.3.tgz -C ~/
// 将解压后的文件移动到最终的安装目录
[bigdata@linux ~]$ mv mongodb-linux-x86_64-rhel62-3.4.3/ /usr/local/mongodb
// 在安装目录下创建data文件夹用于存放数据和日志
[bigdata@linux mongodb]$ mkdir /usr/local/mongodb/data/
// 在data文件夹下创建db文件夹,用于存放数据
[bigdata@linux mongodb]$ mkdir /usr/local/mongodb/data/db/
// 在data文件夹下创建logs文件夹,用于存放日志
[bigdata@linux mongodb]$ mkdir /usr/local/mongodb/data/logs/
// 在logs文件夹下创建log文件
[bigdata@linux mongodb]$ touch /usr/local/mongodb/data/logs/ mongodb.log
// 在data文件夹下创建mongodb.conf配置文件
[bigdata@linux mongodb]$ touch /usr/local/mongodb/data/mongodb.conf
// 在mongodb.conf文件中输入如下内容
[bigdata@linux mongodb]$ vim ./data/mongodb.conf
配置文件内容:
#端口号port = 27017
#数据目录
dbpath = /usr/local/mongodb/data/db
#日志目录
logpath = /usr/local/mongodb/data/logs/mongodb.log
#设置后台运行
fork = true
#日志输出方式
logappend = true
完成MongoDB的安装后,启动MongoDB服务器:
// 启动MongoDB服务器
[bigdata@linux mongodb]$ sudo /usr/local/mongodb/bin/mongod -config /usr/local/mongodb/data/mongodb.conf
// 访问MongoDB服务器
[bigdata@linux mongodb]$ /usr/local/mongodb/bin/mongo
// 停止MongoDB服务器
[bigdata@linux mongodb]$ sudo /usr/local/mongodb/bin/mongod -shutdown -config /usr/local/mongodb/data/mongodb.conf
5.4.2. Redis(单节点)环境配置
// 通过WGET下载REDIS的源码
[bigdata@linux ~]$wget http://download.redis.io/releases/redis-4.0.2.tar.gz
// 将源代码解压到安装目录
[bigdata@linux ~]$ tar -xf redis-4.0.2.tar.gz -C ~/
// 进入Redis源代码目录,编译安装
[bigdata@linux ~]$ cd redis-4.0.2/
// 安装GCC
[bigdata@linux ~]$ sudo yum install gcc
// 编译源代码
[bigdata@linux redis-4.0.2]$ make MALLOC=libc
// 编译安装
[bigdata@linux redis-4.0.2]$ sudo make install
// 创建配置文件
[bigdata@linux redis-4.0.2]$ sudo cp ~/redis-4.0.2/redis.conf /etc/
// 修改配置文件中以下内容
[bigdata@linux redis-4.0.2]$ sudo vim /etc/redis.conf
daemonize yes #37行 #是否以后台daemon方式运行,默认不是后台运行
pidfile /var/run/redis/redis.pid #41行 #redis的PID文件路径(可选)
bind 0.0.0.0 #64行 #绑定主机IP,默认值为127.0.0.1,我们是跨机器运行,所以需要更改
logfile /var/log/redis/redis.log #104行 #定义log文件位置,模式log信息定向到stdout,输出到/dev/null(可选)
dir “/usr/local/rdbfile” #188行 #本地数据库存放路径,默认为./,编译安装默认存在在/usr/local/bin下(可选)
在安装完Redis之后,启动Redis
// 启动Redis服务器
[bigdata@linux redis-4.0.2]$ redis-server /etc/redis.conf
// 连接Redis服务器
[bigdata@linux redis-4.0.2]$ redis-cli
// 停止Redis服务器
[bigdata@linux redis-4.0.2]$ redis-cli shutdown
在安装完Redis之后,启动Redis
// 启动Redis服务器
[bigdata@linux redis-4.0.2]$ redis-server /etc/redis.conf
// 连接Redis服务器
[bigdata@linux redis-4.0.2]$ redis-cli
// 停止Redis服务器
[bigdata@linux redis-4.0.2]$ redis-cli shutdown
5.4.3. Spark(单节点)环境配置
// 通过wget下载zookeeper安装包
[bigdata@linux ~]$ wget https://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz
// 将spark解压到安装目录
[bigdata@linux ~]$ tar –xf spark-2.1.1-bin-hadoop2.7.tgz –C ./cluster
// 进入spark安装目录
[bigdata@linux cluster]$ cd spark-2.1.1-bin-hadoop2.7/
// 复制slave配置文件
[bigdata@linux spark-2.1.1-bin-hadoop2.7]$ cp ./conf/slaves.template ./conf/slaves
// 修改slave配置文件
[bigdata@linux spark-2.1.1-bin-hadoop2.7]$ vim ./conf/slaves
linux #在文件最后将本机主机名进行添加
// 复制Spark-Env配置文件
[bigdata@linux spark-2.1.1-bin-hadoop2.7]$ cp ./conf/spark-env.sh.template ./conf/spark-env.sh
SPARK_MASTER_HOST=linux #添加spark master的主机名
SPARK_MASTER_PORT=7077 #添加spark master的端口号
安装完成之后,启动Spark
// 启动Spark集群
[bigdata@linux spark-2.1.1-bin-hadoop2.7]$ sbin/start-all.sh
// 访问Spark集群,浏览器访问http://linux:8080
// 关闭Spark集群
[bigdata@linux spark-2.1.1-bin-hadoop2.7]$ sbin/stop-all.sh
5.4.4. Zookeeper(单节点)环境配置
// 通过wget下载zookeeper安装包
[bigdata@linux ~]$ wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
// 将zookeeper解压到安装目录
[bigdata@linux ~]$ tar –xf zookeeper-3.4.10.tar.gz –C ./cluster
// 进入zookeeper安装目录
[bigdata@linux cluster]$ cd zookeeper-3.4.10/
// 创建data数据目录
[bigdata@linux zookeeper-3.4.10]$ mkdir data/
// 复制zookeeper配置文件
[bigdata@linux zookeeper-3.4.10]$ cp ./conf/zoo_sample.cfg ./conf/zoo.cfg
// 修改zookeeper配置文件
[bigdata@linux zookeeper-3.4.10]$ vim conf/zoo.cfg
dataDir=/home/bigdata/cluster/zookeeper-3.4.10/data #将数据目录地址修改为创建的目录
// 启动Zookeeper服务
[bigdata@linux zookeeper-3.4.10]$ bin/zkServer.sh start
// 查看Zookeeper服务状态
[bigdata@linux zookeeper-3.4.10]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/bigdata/cluster/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: standalone
// 关闭Zookeeper服务
[bigdata@linux zookeeper-3.4.10]$ bin/zkServer.sh stop
5.4.5. Flume-ng(单节点)环境配置
// 通过wget下载zookeeper安装包
[bigdata@linux ~]$ wget http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
// 将zookeeper解压到安装目录
[bigdata@linux ~]$ tar –xf apache-flume-1.8.0-bin.tar.gz –C ./cluster
// 等待项目部署时使用
5.4.6. Kafka(单节点)环境配置
// 通过wget下载zookeeper安装包
[bigdata@linux ~]$ wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
// 将kafka解压到安装目录
[bigdata@linux ~]$ tar –xf kafka_2.12-0.10.2.1.tgz –C ./cluster
// 进入kafka安装目录
[bigdata@linux cluster]$ cd kafka_2.12-0.10.2.1/
// 修改kafka配置文件
[bigdata@linux kafka_2.12-0.10.2.1]$ vim config/server.properties
host.name=linux #修改主机名
port=9092 #修改服务端口号
zookeeper.connect=linux:2181 #修改Zookeeper服务器地址
// 启动kafka服务 !!! 启动之前需要启动Zookeeper服务
[bigdata@linux kafka_2.12-0.10.2.1]$ bin/kafka-server-start.sh -daemon ./config/server.properties
// 关闭kafka服务
[bigdata@linux kafka_2.12-0.10.2.1]$ bin/kafka-server-stop.sh
// 创建topic
[bigdata@linux kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --create --zookeeper linux:2181 --replication-factor 1 --partitions 1 --topic recommender
// kafka-console-producer
[bigdata@linux kafka_2.12-0.10.2.1]$ bin/kafka-console-producer.sh --broker-list linux:9092 --topic recommender
// kafka-console-consumer
[bigdata@linux kafka_2.12-0.10.2.1]$ bin/kafka-console-consumer.sh --bootstrap-server linux:9092 --topic recommender
5.5. 创建项目并初始化业务数据
我们的项目主体用Scala编写,采用IDEA作为开发环境进行项目编写,采用maven作为项目构建和管理工具。
5.5.1. 在IDEA中创建maven项目
打开IDEA,创建一个maven项目,命名为ECommerceRecommendSystem。为了方便后期的联调,我们会把业务系统的代码也添加进来,所以我们可以以ECommerceRecommendSystem作为父项目,并在其下建一个名为recommender的子项目,然后再在下面搭建多个子项目用于提供不同的推荐服务。
5.5.2. 项目框架搭建
在ECommerceRecommendSystem的pom.xml文件中加入元素`<packaging>pom</packaging>`,然后新建一个maven module作为子项目,命名为recommender。同样的,再以recommender为父项目,在它的pom.xml中加入`<packing>pom</packaging>`,然后新建一个maven module作为子项目。我们的第一步是初始化业务数据,所以子项目命名为DataLoader。
父项目只是为了规范化项目结构,方便依赖管理,本身是不需要代码实现的,所以ECommerceRecommendSystem和recommender下的src文件夹都可以删掉。
目前的整体项目框架如下:
略
5.5.3. 声明项目中工具的版本信息
我们整个项目需要用到多个工具,它们的不同版本可能会对程序运行造成影响,所以应该在最外层的ECommerceRecommendSystem中声明所有子项目共用的版本信息。 在pom.xml中加入以下配置:
ECommerceRecommendSystem/pom.xml
<properties>
<mysql.version>6.0.5</mysql.version>
<shiro.version>1.3.2</shiro.version>
<spring.version>4.3.6.RELEASE</spring.version>
<spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version>
<log4j.version>1.2.17</log4j.version>
<quartz.version>2.2.3</quartz.version>
<slf4j.version>1.7.22</slf4j.version>
<hibernate.version>5.2.6.Final</hibernate.version>
<camel.version>2.18.2</camel.version>
<freemarker.version>2.3.23</freemarker.version>
<config.version>1.10</config.version>
<jackson.version>2.8.6</jackson.version>
<servlet.version>3.0.1</servlet.version>
<net.sf.json.version>2.4</net.sf.json.version>
<activemq.version>5.14.3</activemq.version>
<spark.version>2.1.1</spark.version>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.3</hadoop.version>
<mongodb-spark.version>2.0.0</mongodb-spark.version>
<casbah.version>3.1.1</casbah.version>
<elasticsearch-spark.version>5.6.2</elasticsearch-spark.version>
<elasticsearch.version>5.6.2</elasticsearch.version>
<jblas.version>1.2.1</jblas.version>
</properties>
5.5.4. 添加项目依赖
首先,对于整个项目而言,应该有同样的日志管理,我们在ECommerceRecommendSystem中引入公有依赖:
<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- Logging End -->
</dependencies>
同样,对于maven项目的构建,可以引入公有的插件:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
然后,在recommender模块中,我们可以为所有的推荐模块声明spark相关依赖(这里的dependencyManagement表示仅声明相关信息,子项目如果依赖需要自行引入):
ECommerceRecommendSystem/recommender/pom.xml
<dependencyManagement>
<dependencies>
<!-- 引入Spark相关的Jar包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<!-- 加入MongoDB的驱动 -->
<!-- 用于代码方式连接MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于Spark和MongoDB的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
<!-- 引入Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
由于各推荐模块都是scala代码,还应该引入scala-maven-plugin插件,用于scala程序的编译。因为插件已经在父项目中声明,所以这里不需要再声明版本和具体配置:
<build>
<plugins>
<!-- 如果父项目有声明plugin,那么子项目在引入的时候,不用声明版本和父项目已经声明的配置 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
对于具体的DataLoader子项目,需要spark相关组件,还需要mongodb的相关依赖,我们在pom.xml文件中引入所有依赖(在父项目中已声明的不需要再加详细信息):
ECommerceRecommendSystem/recommender/DataLoader/pom.xml
<dependencies>
<!-- Spark的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<!-- 引入Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 加入MongoDB的驱动 -->
<!-- 用于代码方式连接MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于Spark和MongoDB的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.9.1</version>
</dependency>
</dependencies>
5.6. 数据加载准备
在src/main/目录下,可以看到已有的默认源文件目录是java,我们可以将其改名为scala。将数据文件products.csv,ratings.csv复制到资源文件目录src/main/resources下,我们将从这里读取数据并加载到mongodb中。
5.6.1. Products数据集
数据格式:productId, name, categoryIds, amazonId, imageUrl, categories, tags
例如:
3982^Fuhlen 富勒 M8眩光舞者时尚节能无线鼠标(草绿)(眩光.悦动.时尚炫舞鼠标 12个月免换电池 高精度光学寻迹引擎 超细微接收器10米传输距离)^1057,439,736^B009EJN4T2^https://images-cn-4.ssl-images-amazon.com/images/I/31QPvUDNavL._SY300_QL70_.jpg^外设产品|鼠标|电脑/办公^富勒|鼠标|电子产品|好用|外观漂亮
Product数据集有7个字段,每个字段之间通过“^”符号进行分割。我们用到的字段为:
字段名|字段类型|字段描述|字段备注 -----|-------|------|------- productId|Int|商品ID name|String|商品名称 categories|String|商品分类 imageUrl|String|商品图片 tags|String|UGC标签
5.6.2. Ratings数据集
数据格式: userId,ProductId,rating,timestamp
e.g.
1,31,2.5,1260759144
Rating数据集有4个字段, 每个字段之间通过“,”分割。
字段名|字段类型|字段描述|字段备注 -----|-------|------|------- userId|Int|用户ID productId|Int|商品ID score|Double|评分 timestamp|Long|评分的时间
5.6.3. 日志管理配置文件
log4j对日志的管理,需要通过配置文件来生效。在src/main/resources下新建配置文件log4j.properties,写入以下内容:
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
5.7. 数据初始化到MongoDB
5.7.1. 启动MongoDB数据库(略)
5.7.2. 数据加载程序主体实现
我们会为原始数据定义几个样例类,通过SparkContext的textFile方法从文件中读取数据,并转换成DataFrame,再利用Spark SQL提供的write方法进行数据的分布式插入。
在DataLoader/src/main/scala下新建package,命名为com.atguigu.dataloader,新建名为DataLoader的scala class文件。
程序主体代码如下:
package com.atguigu.dataloader
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
case class Product(productId: Int, name: String, categories: String, imageUrl: String, tags: String)
case class Rating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
// 数据的主加载服务
object DataLoader {
// products.csv和ratings.csv数据集的绝对路径
val PRODUCTS_DATA_PATH = "/Users/yuanzuo/Desktop/ECommerceRecommender/recommender/DataLoader/src/main/resources/products.csv"
val RATING_DATA_PATH = "/Users/yuanzuo/Desktop/ECommerceRecommender/recommender/DataLoader/src/main/resources/ratings.csv"
val MONGODB_PRODUCT_COLLECTION = "Products"
val MONGODB_RATING_COLLECTION = "Rating"
// 程序的入口
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
val sparkConf = new SparkConf().setAppName("DataLoader").setMaster(config.get("spark.cores").get)
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val productRDD = spark.sparkContext.textFile(PRODUCTS_DATA_PATH)
val productDF = productRDD.map(item =>{
val attr = item.split("\\^")
Product(attr(0).toInt, attr(1).trim, attr(5).trim, attr(4).trim, attr(6).trim)
}).toDF()
val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)
val ratingDF = ratingRDD.map(item => {
val attr = item.split(",")
Rating(attr(0).toInt,attr(1).toInt,attr(2).toDouble,attr(3).toInt)
}).toDF()
implicit val mongoConfig = MongoConfig(config.get("mongo.uri").get,config.get("mongo.db").get)
storeDataInMongoDB(productDF, ratingDF)
spark.stop()
}
// 将数据保存到MongoDB中的方法
def storeDataInMongoDB(productDF: DataFrame, ratingDF: DataFrame)(implicit mongoConfig: MongoConfig): Unit = {
//新建一个到MongoDB的连接
val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))
//如果MongoDB中有对应的数据库,那么应该删除
mongoClient(mongoConfig.db)(MONGODB_PRODUCT_COLLECTION).dropCollection()
mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).dropCollection()
//将当前数据写入到MongoDB
productDF
.write
.option("uri",mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
ratingDF
.write
.option("uri",mongoConfig.uri)
.option("collection",MONGODB_RATING_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
//对数据表建索引
mongoClient(mongoConfig.db)(MONGODB_PRODUCT_COLLECTION).createIndex(MongoDBObject("productId" -> 1))
mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("userId" -> 1))
mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("productId" -> 1))
//关闭MongoDB的连接
mongoClient.close()
}
}
5.8. 离线推荐服务建设
5.8.1. 离线推荐服务
离线推荐服务是综合用户所有的历史数据,利用设定的离线统计算法和离线推荐算法周期性的进行结果统计与保存,计算的结果在一定时间周期内是固定不变的,变更的频率取决于算法调度的频率。
离线推荐服务主要计算一些可以预先进行统计和计算的指标,为实时计算和前端业务相应提供数据支撑。
离线推荐服务主要分为统计性算法、基于物品的协同过滤、基于ALS的协同过滤推荐算法、基于内容相似度的推荐。
在recommender下新建子项目StatisticsRecommender,pom.xml文件中只需引入spark、scala和mongodb的相关依赖:
<dependencies>
<!-- Spark的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<!-- 引入Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 加入MongoDB的驱动 -->
<!-- 用于代码方式连接MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于Spark和MongoDB的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
在resources文件夹下引入log4j.properties,然后在src/main/scala下新建scala单例对象com.atguigu.statistics.StatisticsRecommender。
同样,我们应该先建好样例类,在main()方法中定义配置、创建SparkSession并加载数据,最后关闭spark。代码如下:
src/main/scala/com.atguigu.statistics/StatisticsRecommender.scala
package com.atguigu.statistic
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
case class Product(productId: Int, name: String, categories: String, imageUrl: String, tags: String)
case class Rating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri:String, db:String)
case class Recommendation(rid: Int, r: Double)
object StatisticRecommender {
val MONGODB_RATING_COLLECTION = "Rating"
val MONGODB_PRODUCT_COLLECTION = "Products"
//统计的表的名称
val RATE_MORE_PRODUCTS = "RateMoreProducts"
val RATE_MORE_RECENTLY_PRODUCTS = "RateMoreRecentlyProducts"
val AVERAGE_PRODUCTS = "AverageProducts"
// 入口方法
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建SparkConf配置
val sparkConf = new SparkConf().setAppName("StatisticRecommender").setMaster(config("spark.cores"))
// 创建SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 调高日志等级
spark.sparkContext.setLogLevel("ERROR")
val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
//加入隐式转换
import spark.implicits._
//数据加载进来
val ratingDF = spark
.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Rating]
.toDF()
val productDF = spark
.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Product]
.toDF()
ratingDF.createOrReplaceTempView("ratings")
val rateMoreProductsDF = spark.sql("select productId, count(productId) as count from ratings group by productId")
rateMoreProductsDF
.write
.option("uri", mongoConfig.uri)
.option("collection", RATE_MORE_PRODUCTS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
val simpleDateFormat = new SimpleDateFormat("yyyyMM")
spark.udf.register("changeDate", (x: Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt)
val ratingOfYearMonth = spark.sql("select productId, score, changeDate(timestamp) as yearmonth from ratings")
ratingOfYearMonth.createOrReplaceTempView("ratingOfMonth")
val rateMoreRecentlyProducts = spark.sql("select productId, count(productId) as count, yearmonth from ratingOfMonth group by yearmonth, productId")
rateMoreRecentlyProducts
.write
.option("uri",mongoConfig.uri)
.option("collection",RATE_MORE_RECENTLY_PRODUCTS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
val averageProductsDF = spark.sql("select productId, avg(score) as avg from ratings group by productId order by avg desc")
averageProductsDF.show()
averageProductsDF
.write
.option("uri",mongoConfig.uri)
.option("collection",AVERAGE_PRODUCTS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
//关闭Spark
spark.stop()
}
}
5.8.2. 离线统计服务
下面我们针对以上代码分开讲解:
历史热门商品统计
根据所有历史评分数据,计算历史评分次数最多的商品。
实现思路:
通过Spark SQL读取评分数据集,统计所有评分中评分数最多的商品,然后按照从大到小排序,将最终结果写入MongoDB的RateMoreProducts数据集中。
//统计所有历史数据中每个商品的评分数
//数据结构 -> productId,count
val rateMoreProductsDF = spark.sql("select productId, count(productId) as count from ratings group by productId")
rateMoreProductsDF
.write
.option("uri", mongoConfig.uri)
.option("collection", RATE_MORE_PRODUCTS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
最近热门商品统计
根据评分,按月为单位计算最近时间的月份里面评分数最多的商品集合。
实现思路:
通过Spark SQL读取评分数据集,通过UDF函数将评分的数据时间修改为月,然后统计每月商品的评分数。统计完成之后将数据写入到MongoDB的RateMoreRecentlyProducts数据集中。
//统计以月为单位拟每个电商的评分数
//数据结构 -> productId,count,time
//创建一个日期格式化工具
val simpleDateFormat = new SimpleDateFormat("yyyyMM")
//注册一个UDF函数,用于将timestamp装换成年月格式 1260759144000 => 201605
spark.udf.register("changeDate", (x: Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt)
// 将原来的Rating数据集中的时间转换成年月的格式
val ratingOfYearMonth = spark.sql("select productId, score, changeDate(timestamp) as yearmonth from ratings")
// 将新的数据集注册成为一张表
ratingOfYearMonth.createOrReplaceTempView("ratingOfMonth")
val rateMoreRecentlyProducts = spark.sql("select productId, count(productId) as count, yearmonth from ratingOfMonth group by yearmonth, productId")
rateMoreRecentlyProducts
.write
.option("uri",mongoConfig.uri)
.option("collection",RATE_MORE_RECENTLY_PRODUCTS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
商品平均得分统计
根据历史数据中所有用户对商品的评分,周期性的计算每个商品的平均得分。
实现思路:
通过Spark SQL读取保存在MongDB中的Rating数据集,通过执行以下SQL语句实现对于商品的平均分统计:
val averageProductsDF = spark.sql("select productId, avg(score) as avg from ratings group by productId order by avg desc")
averageProductsDF.show()
averageProductsDF
.write
.option("uri",mongoConfig.uri)
.option("collection",AVERAGE_PRODUCTS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
5.8.3. 基于隐语义模型的协同过滤推荐
项目采用ALS作为协同过滤算法,分别根据MongoDB中的用户评分表和商品数据集计算用户商品推荐矩阵以及商品相似度矩阵。
用户商品推荐矩阵
通过ALS训练出来的Model来计算所有当前用户电商的推荐矩阵,主要思路如下:
-
UserId和ProductID做笛卡尔积,产生(userId,productId)的元组
-
通过模型预测(userId,productId)的元组。
-
将预测结果通过预测分值进行排序。
-
返回分值最大的K个电商,作为当前用户的推荐。
最后生成的数据结构如下:将数据保存到MongoDB的UserRecs表中
图略
新建recommender的子项目OfflineRecommender,引入spark、scala、mongo和jblas的依赖:
<dependencies>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
</dependency>
<!-- Spark的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
</dependency>
<!-- 引入Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 加入MongoDB的驱动 -->
<!-- 用于代码方式连接MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于Spark和MongoDB的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
同样经过前期的构建样例类、声明配置、创建SparkSession等步骤,可以加载数据开始计算模型了。核心代码如下:
src/main/scala/com.atguigu.offline/OfflineRecommender.scala
package com.atguigu.offline
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
case class Product(productId: Int, name: String, categories: String, imageUrl: String, tags: String)
case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
case class Recommendation(rid: Int, r: Double)
case class UserRecs(userId: Int, recs: Seq[Recommendation])
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object OfflineRecommender {
val MONGODB_RATING_COLLECTION = "Rating"
val MONGODB_PRODUCT_COLLECTION = "Products"
val USER_MAX_RECOMMENDATION = 20
val USER_RECS = "UserRecs"
val PRODUCT_RECS = "ProductRecs"
//入口方法
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "reommender"
)
//创建一个SparkConf配置
val sparkConf = new SparkConf().setAppName("OfflineRecommender").setMaster(config("spark.cores")).set("spark.executor.memory","6G").set("spark.driver.memory","2G")
//基于SparkConf创建一个SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
//创建一个MongoDBConfig
val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
import spark.implicits._
// 读取mongoDB中的业务数据
val ratingRDD = spark
.read
.option("uri",mongoConfig.uri)
.option("collection",MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(rating => (rating.userId, rating.productId, rating.score)).cache()
//用户的数据集 RDD[Int]
val userRDD = ratingRDD.map(_._1).distinct()
//电影数据集 RDD[Int]
val productRDD = spark
.read
.option("uri",mongoConfig.uri)
.option("collection",MONGODB_PRODUCT_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Product]
.rdd
.map(_.productId).cache()
//创建训练数据集
val trainData = ratingRDD.map(x => Rating(x._1,x._2,x._3))
// r: M x N
// u: M x K
// i: K x N
val (rank,iterations,lambda) = (50, 5, 0.01)
//训练ALS模型
val model = ALS.train(trainData,rank,iterations,lambda)
//计算用户推荐矩阵
//需要构造一个usersProducts RDD[(Int,Int)]
val userProducts = userRDD.cartesian(productRDD)
val preRatings = model.predict(userProducts)
val userRecs = preRatings
.filter(_.rating > 0)
.map(rating => (rating.user, (rating.product, rating.rating)))
.groupByKey()
.map{
case (userId,recs) => UserRecs(userId, recs.toList.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1,x._2)))
}.toDF()
userRecs.write
.option("uri",mongoConfig.uri)
.option("collection",USER_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
val productFeatures = model.productFeatures.map{case (productId, features) =>
(productId, new DoubleMatrix(features))
}
val productRecs = productFeatures.cartesian(productFeatures)
.filter{case (a,b) => a._1 != b._1}
.map{case (a,b) =>
val simScore = this.consinSim(a._2,b._2)
(a._1,(b._1,simScore))
}.filter(_._2._2 > 0.6)
.groupByKey()
.map{case (productId, items) =>
ProductRecs(productId, items.toList.map(x => Recommendation(x._1,x._2)))
}.toDF()
productRecs
.write
.option("uri", mongoConfig.uri)
.option("collection",PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.close()
}
def consinSim(product1: DoubleMatrix, product2: DoubleMatrix) : Double ={
product1.dot(product2) / ( product1.norm2() * product2.norm2() )
}
}
商品相似度矩阵
通过ALS计算商品间相似度矩阵,该矩阵用于查询当前电商的相似电商并为实时推荐系统服务。
离线计算的ALS算法,算法最终会为用户、商品分别生成最终的特征矩阵,分别是表示用户特征矩阵的\(U_{M \times K}\)矩阵,每个用户由\(K\)个特征描述;表示物品特征矩阵的\(V{N \times K}\)矩阵,每个物品也由\(K\)个特征描述。
\(V_{N \times K}\)表示物品特征矩阵,每一行是一个\(K\)维向量,虽然我们并不知道每一个维度的特征意义是什么,但是\(K\)个维度的数学向量表示了该行对应电商的特征。
所以,每个商品用\(V_{N \times K}\)每一行的\((t_1, \dots, t_K)\)向量表示其特征。
于是任意两个商品
\(p\): 特征向量为\(V_p=(t_{p1}, \dots, t_{pk})\)
\(q\): 特征向量为\(V_q=(t_{q1}, \dots, t_{qk})\)
之间的相似度\(sim(p, q)\)可以使用\(V_p\)和\(V_q\)的余弦值来表示:
数据集中任意两个商品间相似度都可以由公式计算得到,商品与商品之间的相似度在一段时间内基本是固定值。最后生成的数据保存到MongoDB的ProductRecs表中。
图略
核心代码如下:
val productFeatures = model.productFeatures.map{case (productId, features) =>
(productId, new DoubleMatrix(features))
}
val productRecs = productFeatures.cartesian(productFeatures)
.filter{case (a,b) => a._1 != b._1}
.map{case (a,b) =>
val simScore = this.consinSim(a._2,b._2)
(a._1,(b._1,simScore))
}.filter(_._2._2 > 0.6)
.groupByKey()
.map{case (productId, items) =>
ProductRecs(productId, items.toList.map(x => Recommendation(x._1,x._2)))
}.toDF()
productRecs
.write
.option("uri", mongoConfig.uri)
.option("collection",PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
其中,consinSim是求两个向量余弦相似度的函数,代码实现如下:
def consinSim(product1: DoubleMatrix, product2: DoubleMatrix) : Double ={
product1.dot(product2) / ( product1.norm2() * product2.norm2() )
}
模型评估和参数选取
在上述模型训练的过程中,我们直接给定了隐语义模型的\(rank, iterations, lambda\)三个参数。对于我们的模型,这并不一定是最优的参数选取,所以我们需要对模型进行评估。通常的做法是计算均方根误差(RMSE),考察预测评分与实际评分之间的误差。
有了RMSE,我们可以就可以通过多次调整参数值,来选取RMSE最小的一组作为我们模型的优化选择。
在`scala/com.atguigu.offline/`下新建单例对象ALSTrainer,代码主体架构如下:
package com.atguigu.offline
import breeze.numerics.sqrt
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object ALSTrainer {
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
//创建SparkConf
val sparkConf = new SparkConf().setAppName("ALSTrainer").setMaster(config("spark.cores"))
//创建SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
import spark.implicits._
//加载评分数据
val ratingRDD = spark
.read
.option("uri",mongoConfig.uri)
.option("collection",OfflineRecommender.MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(rating => Rating(rating.userId, rating.productId, rating.score)).cache()
// 训练集的数据量是80%,测试集的数量是20%
val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
val trainingRDD = splits(0)
val testingRDD = splits(1)
//输出最优参数
adjustALSParams(trainingRDD, testingRDD)
//关闭Spark
spark.close()
}
// 输出最终的最优参数
def adjustALSParams(trainData:RDD[Rating], testData:RDD[Rating]): Unit ={
val result = for(rank <- Array(30,40,50,60,70); lambda <- Array(1, 0.1, 0.001))
yield {
val model = ALS.train(trainData,rank,5,lambda)
val rmse = getRmse(model, testData)
(rank,lambda,rmse)
}
println(result.sortBy(_._3).head)
}
def getRmse(model: MatrixFactorizationModel, testData: RDD[Rating]):Double={
//需要构造一个usersProducts RDD[(Int,Int)]
val userProducts = testData.map(item => (item.user,item.product))
val predictRating = model.predict(userProducts)
val real = testData.map(item => ((item.user,item.product),item.rating))
val predict = predictRating.map(item => ((item.user,item.product),item.rating))
sqrt(
real.join(predict).map{case ((uid,mid),(real,pre))=>
// 真实值和预测值之间的两个差值
val err = real - pre
err * err
}.mean()
)
}
}
其中adjustALSParams方法是模型评估的核心,输入一组训练数据和测试数据,输出计算得到最小RMSE的那组参数。代码实现如下:
// 输出最终的最优参数
def adjustALSParams(trainData:RDD[Rating], testData:RDD[Rating]): Unit = {
// 这里指定迭代次数为5,rank和lambda在几个值中选取调整
val result = for(rank <- Array(30, 40, 50, 60, 70); lambda <- Array(1, 0.1, 0.001))
yield {
val model = ALS.train(trainData, rank, 5, lambda)
val rmse = getRmse(model, testData)
(rank,lambda,rmse)
}
println(result.sortBy(_._3).head)
}
计算RMSE的函数getRMSE代码实现如下:
def getRmse(model: MatrixFactorizationModel, testData: RDD[Rating]): Double = {
//需要构造一个usersProducts RDD[(Int,Int)]
val userProducts = testData.map(item => (item.user,item.product))
val predictRating = model.predict(userProducts)
val real = testData.map(item => ((item.user,item.product),item.rating))
val predict = predictRating.map(item => ((item.user,item.product),item.rating))
sqrt(
real.join(predict).map{case ((uid,mid),(real,pre))=>
// 真实值和预测值之间的两个差值
val err = real - pre
err * err
}.mean()
)
}
运行代码,我们就可以得到目前数据的最优模型的超参数。
5.9. 实时推荐服务建设
5.9.1. 实时推荐服务
实时计算与离线计算应用于推荐系统上最大的不同在于实时计算推荐结果应该反映最近一段时间用户近期的偏好,而离线计算推荐结果则是根据用户从第一次评分起的所有评分记录来计算用户总体的偏好。
用户对物品的偏好随着时间的推移总是会改变的。比如一个用户\(u\)在某时刻对商品\(p\)给予了极高的评分,那么在近期一段时候,\(u\)极有可能很喜欢与商品\(p\)类似的其他商品; 而如果用户\(u\)在某时刻对商品\(q\)给予了极低的评分,那么在近期一段时候,\(u\)极有可能不喜欢与商品\(q\)类似的其他商品。所以对于实时推荐,当用户对一个电商进行了评价后,用户会希望推荐结果基于最近这几次评分进行一定的更新,使得推荐结果匹配用户近期的偏好,满足用户近期的口味。
如果实时推荐继续采用离线推荐中的ALS算法,由于算法运行时间巨大,不具有实时得到新的推荐结果的能力;并且由于算法本身的使用的是评分表,用户本次评分后只更新了总评分表中的一项,使得算法运行后的推荐结果与用户本次评分之前的推荐结果基本没有多少差别,从而给用户一种推荐结果一直没变化的感觉,很影响用户体验。
另外,在实时推荐中由于时间性能上要满足实时或者准实时的要求,所以算法的计算量不能太大,避免复杂、过多的计算造成用户体验的下降。鉴于此,推荐精度往往不会很高。实时推荐系统更关心推荐结果的动态变化能力,只要更新推荐结果的理由合理即可,至于推荐的精度要求则可以适当放宽。
所以对于实时推荐算法,主要有两点需求:
-
用户本次评分后、或最近几个评分后系统可以明显的更新推荐结果;
-
计算量不大,满足响应时间上的实时或者准实时要求;
5.9.2. 实时推荐算法设计
当用户\(u\)对商品\(p\)进行了评分,将触发一次对u的推荐结果的更新。由于用户\(u\)对商品\(p\)评分,对于用户\(u\)来说,他与\(p\)最相似的商品们之间的推荐强度将发生变化,所以选取与商品\(p\)最相似的\(K\)个商品作为候选商品。
每个候选商品按照“推荐优先级”这一权重作为衡量这个商品被推荐给用户\(u\)的优先级。
这些商品将根据用户\(u\)最近的若干评分计算出各自对用户\(u\)的推荐优先级,然后与上次对用户\(u\)的实时推荐结果的进行基于推荐优先级的合并、替换得到更新后的推荐结果。
具体来说:
首先,获取用户\(u\)按时间顺序最近的K个评分,记为\(R_K\);获取商品\(p\)的最相似的\(K\)个商品集合,记为\(S\);
然后,对于每个商品\(q\)属于\(S\),计算其推荐优先级\(E_{uq}\),计算公式如下:
其中: - \(R_r\)表示用户\(u\)对商品\(r\)的评分; - \(sim(q,r)\)表示商品\(q\)与商品\(r\)的相似度,设定最小相似度为0.6,当商品\(q\)和商品\(r\)相似度低于0.6的阈值,则视为两者不相关并忽略; - \(sim \underline\space sum\)表示\(q\)与\(R_K\)中商品相似度大于最小阈值的个数; - \(incount\)表示\(R_K\)中与商品\(q\)相似的、且本身评分较高\((>=3)\)的商品个数; - \(recount\)表示\(R_K\)中与商品\(q\)相似的、且本身评分较低\((<3)\)的商品个数;
公式的意义如下:
首先对于每个候选商品\(q\),从\(u\)最近的\(K\)个评分中,找出与\(q\)相似度较高\((>=0.6)\)的\(u\)已评分商品们,对于这些商品们中的每个商品\(r\),将\(r\)与\(q\)的相似度乘以用户\(u\)对\(r\)的评分,将这些乘积计算平均数,作为用户\(u\)对商品\(q\)的评分预测即
然后,将\(u\)最近的\(K\)个评分中与商品\(q\)相似的、且本身评分较高\((>=3)\)的商品个数记为\(incount\),计算\(log_2max(incount, 1)\)作为商品\(q\)的“增强因子”,意义在于商品\(q\)与\(u\)的最近\(K\)个评分中的\(n\)个高评分\((>=3)\)商品相似,则商品\(q\)的优先级被增加\(log_2max(incount, 1)\)。如果商品\(q\)与\(u\)的最近\(K\)个评分中相似的高评分商品越多,也就是说\(n\)越大,则商品\(q\)更应该被推荐,所以推荐优先级被增强的幅度较大;如果商品\(q\)与\(u\)的最近K个评分中相似的高评分商品越少,也就是\(n\)越小,则推荐优先级被增强的幅度较小;
而后,将\(u\)最近的\(K\)个评分中与商品\(q\)相似的、且本身评分较低\((<3)\)的商品个数记为\(recount\),计算\(log_2max(recount, 1)\)作为商品\(q\)的“削弱因子”,意义在于商品q与u的最近K个评分中的n个低评分\((<3)\)商品相似,则商品q的优先级被削减\(log_2max(recount, 1)\)。如果商品\(q\)与\(u\)的最近\(K\)个评分中相似的低评分商品越多,也就是说\(n\)越大,则商品\(q\)更不应该被推荐,所以推荐优先级被减弱的幅度较大;如果商品\(q\)与\(u\)的最近\(K\)个评分中相似的低评分商品越少,也就是\(n\)越小,则推荐优先级被减弱的幅度较小;
最后,将增强因子增加到上述的预测评分中,并减去削弱因子,得到最终的\(q\)商品对于\(u\)的推荐优先级。在计算完每个候选商品\(q\)的\(E_{uq}\)后,将生成一组\((商品q的ID, q的推荐优先级)\)的列表\(updatedList\):
而在本次为用户\(u\)实时推荐之前的上一次实时推荐结果\(Rec\)也是一组$(商品m, m的推荐优先级)$的列表,其大小也为\(K\):
接下来,将\(updated_S\)与本次为\(u\)实时推荐之前的上一次实时推荐结果\(Rec\)进行基于合并、替换形成新的推荐结果\(New\space Rec\):
其中,\(i\)表示\(updated_S\)与\(Rec\)的商品集合中的每个商品,\(topK\)是一个函数,表示从\(Rec\)和\(updated_S\)的并集中选择出最大的\(K\)个商品,\(cmp=E_{ui}\)表示topK函数将推荐优先级\(E_{ui}\)值最大的K个商品选出来。最终,\(New \space Rec\)即为经过用户\(u\)对商品\(p\)评分后触发的实时推荐得到的最新推荐结果。
总之,实时推荐算法流程流程基本如下:
-
用户\(u\)对商品\(p\)进行了评分,触发了实时推荐的一次计算;
-
选出商品\(p\)最相似的\(K\)个商品作为集合\(S\);
-
获取用户\(u\)最近时间内的\(K\)条评分,包含本次评分,作为集合\(R_K\);
-
计算商品的推荐优先级,产生\((qID, E_{uq})\)集合\(updated_S\);
将\(updated_S\)与上次对用户\(u\)的推荐结果\(Rec\)利用公式(4-4)进行合并,产生新的推荐结果\(New\space Rec\);作为最终输出。
我们在recommender下新建子项目StreamingRecommender,引入spark、scala、mongo、redis和kafka的依赖:
<dependencies>
<!-- Spark的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
</dependency>
<!-- 引入Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 加入MongoDB的驱动 -->
<!-- 用于代码方式连接MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于Spark和MongoDB的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
<!-- redis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
代码中首先定义样例类和一个连接助手对象(用于建立redis和mongo连接),并在StreamingRecommender中定义一些常量:
src/main/scala/com.atguigu.streaming/StreamingRecommender.scala
package com.atguigu.streaming
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis
import scala.collection.JavaConversions._
object ConnHelper extends Serializable{
lazy val jedis = new Jedis("localhost")
lazy val mongoClient = MongoClient(MongoClientURI("mongodb://localhost:27017/recommender"))
}
case class MongConfig(uri: String, db: String)
//推荐
case class Recommendation(rid: Int, r: Double)
// 用户的推荐
case class UserRecs(uid: Int, recs: Seq[Recommendation])
//商品的相似度
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object StreamingRecommender {
val MAX_USER_RATINGS_NUM = 20
val MAX_SIM_PRODUCTS_NUM = 20
val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
val MONGODB_RATING_COLLECTION = "Rating"
val MONGODB_PRODUCT_RECS_COLLECTION = "ProductRecs"
//入口方法
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender",
"kafka.topic" -> "recommender"
)
//创建一个SparkConf配置
val sparkConf = new SparkConf().setAppName("StreamingRecommender").setMaster(config("spark.cores"))
//创建Spark的对象, 因为spark session中没有封装streaming context,所以需要new一个
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc,Seconds(2))
implicit val mongConfig = MongConfig(config("mongo.uri"),config("mongo.db"))
import spark.implicits._
//****************** 广播商品相似度矩阵
val simProductsMatrix = spark
.read
.option("uri",config("mongo.uri"))
.option("collection",MONGODB_PRODUCT_RECS_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRecs]
.rdd
.map{recs =>
(recs.productId, recs.recs.map(x => (x.rid, x.r)).toMap)
}.collectAsMap()
val simProductsMatrixBroadCast = sc.broadcast(simProductsMatrix)
val abc = sc.makeRDD(1 to 2)
abc.map(x => simProductsMatrixBroadCast.value.get(1)).count()
//******************
//创建到Kafka的连接
val kafkaPara = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "recommender",
"auto.offset.reset" -> "latest" //每次从kafka 消费数据,都是通过zookeeper存储的数据offset,来判断需要获取消息在消息日志里的起始位置
)
val kafkaStream = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(config("kafka.topic")),kafkaPara))
val ratingStream = kafkaStream.map{case msg =>
var attr = msg.value().split("\\|") // split方法对. | * + ^需要转义(类似正则)
(attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)
}
ratingStream.foreachRDD{rdd =>
rdd.map{case (userId, productId, score, timestamp) =>
println(">>>>>>>>>>>>>>>>")
//获取当前最近的M次商品评分
val userRecentlyRatings = getUserRecentlyRating(MAX_USER_RATINGS_NUM, userId, ConnHelper.jedis)
//获取商品P最相似的K个商品
val simProducts = getTopSimProducts(MAX_SIM_PRODUCTS_NUM, productId, userId, simProductsMatrixBroadCast.value)
//计算待选商品的推荐优先级
val streamRecs = computeProductScores(simProductsMatrixBroadCast.value, userRecentlyRatings, simProducts)
//将数据保存到MongoDB
saveRecsToMongoDB(userId, streamRecs)
}.count()
}
//启动Streaming程序
ssc.start()
ssc.awaitTermination()
}
def saveRecsToMongoDB(uid:Int,streamRecs:Array[(Int,Double)])(implicit mongConfig: MongConfig): Unit ={
//到StreamRecs的连接
val streamRecsCollection = ConnHelper.mongoClient(mongConfig.db)(MONGODB_STREAM_RECS_COLLECTION)
streamRecsCollection.findAndRemove(MongoDBObject("uid" -> uid))
//streaRecsCollection.insert(MongoDBObject("uid" -> uid, "recs" -> streamRecs.map(x=> x._1+":"+x._2).mkString("|")))
streamRecsCollection.insert(MongoDBObject("uid"->uid, "recs"-> streamRecs.map(x => MongoDBObject("mid"->x._1, "score"->x._2)) ))
}
def computeProductScores(simProducts: scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]],userRecentlyRatings:Array[(Int,Double)],topSimProducts: Array[Int]): Array[(Int,Double)] = {
//用于保存每一个待选商品和最近评分的每一个商品的权重得分
val score = scala.collection.mutable.ArrayBuffer[(Int,Double)]()
//用于保存每一个商品的增强因子数
val increMap = scala.collection.mutable.HashMap[Int,Int]()
//用于保存每一个商品的减弱因子数
val decreMap = scala.collection.mutable.HashMap[Int,Int]()
for (topSimProduct <- topSimProducts; userRecentlyRating <- userRecentlyRatings){
val simScore = getProductsSimScore(simProducts, userRecentlyRating._1, topSimProduct)
if(simScore > 0.6){
score += ((topSimProduct, simScore * userRecentlyRating._2 ))
if(userRecentlyRating._2 > 3){
increMap(topSimProduct) = increMap.getOrDefault(topSimProduct,0) + 1
}else{
decreMap(topSimProduct) = decreMap.getOrDefault(topSimProduct,0) + 1
}
}
}
score.groupBy(_._1).map{case (mid,sims) =>
(mid,sims.map(_._2).sum / sims.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)))
}.toArray
}
//取2的对数
def log(m: Int): Double ={
math.log(m) / math.log(2)
}
def getProductsSimScore(simProducts: scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]], userRatingProduct: Int, topSimProduct: Int): Double ={
simProducts.get(topSimProduct) match {
case Some(sim) => sim.get(userRatingProduct) match {
case Some(score) => score
case None => 0.0
}
case None => 0.0
}
}
def getTopSimProducts(num: Int, productId: Int, userId: Int, simProducts:scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]])(implicit mongConfig: MongConfig): Array[Int] ={
//从广播变量的商品相似度矩阵中获取当前商品所有的相似商品
val allSimProducts = simProducts(productId).toArray
//获取用户已经观看过得商品
val ratingExist = ConnHelper.mongoClient(mongConfig.db)(MONGODB_RATING_COLLECTION).find(MongoDBObject("userId" -> userId)).toArray.map{item =>
item.get("productId").toString.toInt
}
//过滤掉已经评分过得商品,并排序输出
allSimProducts.filter(x => !ratingExist.contains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1)
}
def getUserRecentlyRating(num: Int, userId: Int, jedis: Jedis): Array[(Int, Double)] ={
//从用户的队列中取出num个评论
jedis.lrange("userId:" + userId.toString, 0, num).map{item =>
val attr = item.split("\\:")
(attr(0).trim.toInt, attr(1).trim.toDouble)
}.toArray
}
}
5.9.3. 实时推荐算法的讲解
实时推荐算法的前提:
-
在Redis集群中存储了每一个用户最近对商品的K次评分。实时算法可以快速获取。
-
离线推荐算法已经将商品相似度矩阵提前计算到了MongoDB中。
-
Kafka已经获取到了用户实时的评分数据。
算法过程如下:
实时推荐算法输入为一个评分(userId, ProductId, rate, timestamp),而执行的核心内容包括:获取userId最近K次评分、获取productId最相似K个商品、计算候选商品的推荐优先级、更新对userId的实时推荐结果。
获取用户的K次最近评分
业务服务器在接收用户评分的时候,默认会将该评分情况以userId, productId, rating, timestamp的格式插入到Redis中该用户对应的队列当中,在实时算法中,只需要通过Redis客户端获取相对应的队列内容即可。
import scala.collection.JavaConversions._
/**
* 获取当前最近的M次商品评分
* @param num 评分的个数
* @param userId 谁的评分
* @return
*/
def getUserRecentlyRating(num:Int, userId:Int,jedis:Jedis): Array[(Int,Double)] ={
//从用户的队列中取出num个评分
jedis.lrange("userId:"+userId.toString, 0, num).map{item =>
val attr = item.split("\\:")
(attr(0).trim.toInt, attr(1).trim.toDouble)
}.toArray
}
获取当前商品最相似的K个商品
在离线算法中,已经预先将商品的相似度矩阵进行了计算,所以每个商品productId的最相似的K个商品很容易获取:从MongoDB中读取ProductRecs数据,从productId在simHash对应的子哈希表中获取相似度前K大的那些商品。输出是数据类型为Array[Int]的数组,表示与productId最相似的商品集合,并命名为candidateProducts以作为候选商品集合。
/**
* 获取当前商品K个相似的商品
* @param num 相似商品的数量
* @param productId 当前商品的ID
* @param userId 当前的评分用户
* @param simProducts 商品相似度矩阵的广播变量值
* @param mongConfig MongoDB的配置
* @return
*/
def getTopSimProducts(num:Int, productId:Int, userId:Int, simProducts:scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]])(implicit mongConfig: MongConfig): Array[Int] ={
//从广播变量的商品相似度矩阵中获取当前商品所有的相似商品
val allSimProducts = simProducts.get(productId).get.toArray
//获取用户已经评分过的商品
val ratingExist = ConnHelper.mongoClient(mongConfig.db)(MONGODB_RATING_COLLECTION).find(MongoDBObject("userId" -> userId)).toArray.map{item =>
item.get("productId").toString.toInt
}
//过滤掉已经评分过的商品,并排序输出
allSimProducts.filter(x => !ratingExist.contains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1)
}
商品推荐优先级计算
对于候选商品集合simiHash和userId的最近K个评分recentRatings,算法代码内容如下:
/**
* 计算待选商品的推荐分数
* @param simProducts 商品相似度矩阵
* @param userRecentlyRatings 用户最近的k次评分
* @param topSimProducts 当前商品最相似的K个商品
* @return
*/
def computeProductScores(simProducts: scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]],userRecentlyRatings:Array[(Int,Double)],topSimProducts: Array[Int]): Array[(Int,Double)] = {
//用于保存每一个待选商品和最近评分的每一个商品的权重得分
val score = scala.collection.mutable.ArrayBuffer[(Int,Double)]()
//用于保存每一个商品的增强因子数
val increMap = scala.collection.mutable.HashMap[Int,Int]()
//用于保存每一个商品的减弱因子数
val decreMap = scala.collection.mutable.HashMap[Int,Int]()
for (topSimProduct <- topSimProducts; userRecentlyRating <- userRecentlyRatings){
val simScore = getProductsSimScore(simProducts, userRecentlyRating._1, topSimProduct)
if(simScore > 0.6){
score += ((topSimProduct, simScore * userRecentlyRating._2 ))
if(userRecentlyRating._2 > 3){
increMap(topSimProduct) = increMap.getOrDefault(topSimProduct,0) + 1
}else{
decreMap(topSimProduct) = decreMap.getOrDefault(topSimProduct,0) + 1
}
}
}
score.groupBy(_._1).map{case (mid,sims) =>
(mid,sims.map(_._2).sum / sims.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)))
}.toArray
}
其中,getProductSimScore是取候选商品和已评分商品的相似度,代码如下:
/**
* 获取当个商品之间的相似度
* @param simProducts 商品相似度矩阵
* @param userRatingProduct 用户已经评分的商品
* @param topSimProduct 候选商品
* @return
*/
def getProductsSimScore(simProducts: scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]], userRatingProduct: Int, topSimProduct: Int): Double = {
simProducts.get(topSimProduct) match {
case Some(sim) => sim.get(userRatingProduct) match {
case Some(score) => score
case None => 0.0
}
case None => 0.0
}
}
而log是对数运算,这里实现为取2的对数(常用对数):
//取2的对数
def log(m: Int): Double = {
math.log(m) / math.log(2)
}
将结果保存到mongoDB
saveRecsToMongoDB函数实现了结果的保存:
/**
* 将数据保存到MongoDB userId -> 1, recs -> 22:4.5|45:3.8
* @param streamRecs 流式的推荐结果
* @param mongConfig MongoDB的配置
*/
def saveRecsToMongoDB(userId: Int, streamRecs: Array[(Int,Double)])(implicit mongConfig: MongConfig): Unit = {
//到StreamRecs的连接
val streamRecsCollection = ConnHelper.mongoClient(mongConfig.db)(MONGODB_STREAM_RECS_COLLECTION)
streamRecsCollection.findAndRemove(MongoDBObject("userId" -> userId))
//streaRecsCollection.insert(MongoDBObject("userId" -> userId, "recs" -> streamRecs.map(x=> x._1+":"+x._2).mkString("|")))
streamRecsCollection.insert(MongoDBObject("userId" -> userId, "recs" -> streamRecs.map(x => MongoDBObject("productId" -> x._1, "score" -> x._2))))
}
更新实时推荐结果
当计算出候选商品的推荐优先级的数组updatedRecommends<productId, E>后,这个数组将被发送到Web后台服务器,与后台服务器上userId的上次实时推荐结果recentRecommends<productId, E>进行合并、替换并选出优先级E前K大的商品作为本次新的实时推荐。具体而言:
-
合并:将updatedRecommends与recentRecommends并集合成为一个新的<productId, E>数组;
-
替换(去重):当updatedRecommends与recentRecommends有重复的商品productId时,recentRecommends中productId的推荐优先级由于是上次实时推荐的结果,于是将作废,被替换成代表了更新后的updatedRecommends的productId的推荐优先级;
-
选取TopK:在合并、替换后的<ProductId, E>数组上,根据每个product的推荐优先级,选择出前K大的商品,作为本次实时推荐的最终结果。
5.9.4. 实时系统联调
我们的系统实时推荐的数据流向是:业务系统 → 日志 → flume 日志采集 → kafka streaming数据清洗和预处理 → spark streaming 流式计算。在我们完成实时推荐服务的代码后,应该与其它工具进行联调测试,确保系统正常运行。
启动实时系统的基本组件
启动实时推荐系统StreamingRecommender以及mongodb、redis
启动zookeeper
$ bin/zkServer.sh start
启动kafka
bin/kafka-server-start.sh -daemon ./config/server.properties
构建Kafka Streaming程序
在recommender下新建module,KafkaStreaming,主要用来做日志数据的预处理,过滤出需要的内容。pom.xml文件需要引入依赖:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
</dependencies>
<build>
<finalName>kafkastream</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.atguigu.kafkastream.Application</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
在src/main/java下新建java类com.atguigu.kafkastreaming.Application
package com.atguigu.kafkastream;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
import java.util.Properties;
public class Application {
public static void main(String[] args){
String brokers = "localhost:9092";
String zookeepers = "localhost:2181";
String from = "log";
String to = "recommender";
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);
StreamsConfig config = new StreamsConfig(settings);
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", from)
.addProcessor("PROCESS", () -> new LogProcessor(), "SOURCE")
.addSink("SINK", to, "PROCESS");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}
这个程序会将topic为“log”的信息流获取来做处理,并以“recommender”为新的topic转发出去。
流处理程序LogProcess.java
package com.atguigu.kafkastream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
public class LogProcessor implements Processor<byte[],byte[]> {
private ProcessorContext context;
public void init(ProcessorContext context) {
this.context = context;
}
public void process(byte[] dummy, byte[] line) {
String input = new String(line);
if(input.contains("PRODUCT_RATING_PREFIX:")){
System.out.println("product rating coming!!!!");
input = input.split("PRODUCT_RATING_PREFIX:")[1].trim();
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
public void punctuate(long timestamp) {
}
public void close() {
}
}
完成代码后,启动Application。
配置并启动flume
在flume的conf目录下新建log-kafka.properties,对flume连接kafka做配置:
agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink
# For each one of the sources, the type is defined
agent.sources.exectail.type = exec
# 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录
agent.sources.exectail.command = tail –f /mnt/d/Projects/BigData/ProductRecommender/businessServer/src/main/log/agent.log
agent.sources.exectail.interceptors = i1
agent.sources.exectail.interceptors.i1.type=regex_filter
# 定义日志过滤前缀的正则
agent.sources.exectail.interceptors.i1.regex=.+PRODUCT_RATING_PREFIX.+
# The channel can be defined as follows.
agent.sources.exectail.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.kafka.topic = log
agent.sinks.kafkasink.kafka.bootstrap.servers = localhost:9092
agent.sinks.kafkasink.kafka.producer.acks = 1
agent.sinks.kafkasink.kafka.flumeBatchSize = 20
#Specify the channel the sink should use
agent.sinks.kafkasink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000
配置好后,启动flume:
$ ./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console
启动业务系统后台
将业务代码加入系统中, 将仓库中的`businessServer`文件夹拷贝到项目中, 和`recommender`同级。注意在`src/main/resources/`下的`log4j.properties`中,`log4j.appender.file.File`的值应该替换为自己的日志目录,与flume中的配置应该相同。例如:
log4j.appender.file.File=/mnt/d/Projects/BigData/ProductRecommender/businessServer/src/main/log/agent.log
运行业务系统:
-
点击idea右侧的maven projects.
-
找到businessServer中的plugins中的tomcat7插件.
-
双击运行tomcat7:run.
启动业务系统后台,访问`localhost:8088`;点击某个商品进行评分,查看实时推荐列表是否会发生变化。
5.10. 冷启动问题处理
整个推荐系统更多的是依赖于用于的偏好信息进行商品的推荐,那么就会存在一个问题,对于新注册的用户是没有任何偏好信息记录的,那这个时候推荐就会出现问题,导致没有任何推荐的商品出现。
我们在电商推荐中解决冷启动的方案是:给新注册的用户推荐热门的商品,例如近期热门商品、历史热门商品等策略。
实际生产环境中可以根据不同的业务场景调整策略。例如资讯类应用可以先给用户一个交互式的标签页面,让用户自己选择感兴趣的标签,然后推荐用户感兴趣标签的热门内容。
5.11. 基于内容的推荐服务
5.11.1. 基于内容的推荐服务
原始数据中的tags字段,是用户给商品打上的标签,这部分内容想要直接转成评分并不容易,不过我们可以将标签内容进行提取,得到商品的内容特征向量,进而可以通过求取相似度矩阵。这部分可以与实时推荐系统直接对接,计算出与用户当前评分商品的相似商品,实现基于内容的实时推荐。为了避免热门标签对特征提取的影响,我们还可以通过TF-IDF算法对标签的权重进行调整,从而尽可能地接近用户偏好。
5.11.2. 基于内容推荐的实现
基于以上思想,加入TF-IDF算法的求取商品特征向量的核心代码如下:
package com.atguigu.content
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.SparseVector
import org.jblas.DoubleMatrix
case class MongoConfig(uri: String, db: String)
case class Product(productId: Int, name: String, categories: String, imageUrl: String, tags: String)
//推荐
case class Recommendation(rid: Int, r: Double)
// 用户的推荐
case class UserRecs(userId: Int, recs: Seq[Recommendation])
//商品的相似度
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object ContentBasedRecommender {
val MONGODB_PRODUCT_COLLECTION = "Products"
val PRODUCT_RECS = "ContentBasedProductRecs"
def consinSim(product1: DoubleMatrix, product2: DoubleMatrix) : Double ={
product1.dot(product2) / ( product1.norm2() * product2.norm2() )
}
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "reommender"
)
//创建一个SparkConf配置
val sparkConf = new SparkConf().setAppName("ContentBasedRecommender").setMaster(config("spark.cores")).set("spark.executor.memory","6G").set("spark.driver.memory","2G")
//基于SparkConf创建一个SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
//创建一个MongoDBConfig
val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
import spark.implicits._
val productRDD = spark
.read
.option("uri",mongoConfig.uri)
.option("collection",MONGODB_PRODUCT_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Product]
.rdd
.map(x => (x.productId, x.name, x.tags.map(c => if(c == '|') ' ' else c)))
val productSeq = productRDD.collect()
val tagsData = spark.createDataFrame(productSeq).toDF("productId", "name", "tags")
// 实例化一个分词器,默认按空格分
val tokenizer = new Tokenizer().setInputCol("tags").setOutputCol("words")
// 用分词器做转换,生成列“words”,返回一个dataframe,增加一列words
val wordsData = tokenizer.transform(tagsData)
wordsData.show(5)
// HashingTF是一个工具,可以把一个词语序列,转换成词频(初始特征)
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(189)
// 用 HashingTF 做处理,返回dataframe
val featurizedData = hashingTF.transform(wordsData)
// IDF 也是一个工具,用于计算文档的IDF
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
// 将词频数据传入,得到idf模型(统计文档)
val idfModel = idf.fit(featurizedData)
// 模型对原始数据做处理,计算出idf后,用tf-idf得到新的特征矩阵
val rescaledData = idfModel.transform(featurizedData)
rescaledData.show(5)
val productFeatures = rescaledData.map{
case row => {
if (row.getAs[Int]("productId") == 160597 || row.getAs[Int]("productId") == 8195) {
println(row)
}
(row.getAs[Int]("productId"), row.getAs[SparseVector]("features").toArray)
}
}
.rdd
.map(x => {
(x._1, new DoubleMatrix(x._2) )
})
val productRecs = productFeatures.cartesian(productFeatures)
.filter{case (a, b) => a._1 != b._1}
.map {
case (a, b) => {
if (a._1==160597 && b._1==8195) {
println(a._1, a._2, b._1, b._2)
}
val simScore = this.consinSim(a._2, b._2)
(a._1, (b._1, simScore))
}
}
.groupByKey()
.map {
case (productId, items) => ProductRecs(productId, items.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)).take(5))
}
.toDF()
productRecs.show(5)
productRecs
.write
.option("uri", mongoConfig.uri)
.option("collection", PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
//关闭Spark
spark.close()
}
}
然后通过商品特征向量进而求出相似度矩阵,就可以为实时推荐提供基础,得到用户推荐列表了。可以看出,基于内容和基于隐语义模型,目的都是为了提取出物品的特征向量,从而可以计算出相似度矩阵。而我们的实时推荐系统算法正是基于相似度来定义的。
当然,别忘记在`pom.xml`中添加依赖:
<dependencies>
<!-- 引入Spark相关的Jar包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<!-- 加入MongoDB的驱动 -->
<!-- 用于代码方式连接MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于Spark和MongoDB的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
<!-- 引入Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
</dependency>
</dependencies>
5.12. 基于物品的协同过滤
基于同现相似度的计算公式来计算不同物品间的相似度。
其中$N_i$为购买商品\(i\)的用户列表,$N_j$为购买商品$j$的用户列表。
核心代码如下:
package com.atguigu.itemcf
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
// 物品信息
case class Product(productId: Int, name: String, categories: String, imageUrl: String, tags: String)
case class MongoConfig(uri:String, db:String)
// 用户-物品-评分
case class Rating(userId: Int, productId: Int, score: Double, timestamp: Int)
// 用户信息
case class User(userId: Int)
case class Recommendation(rid: Int, r: Double)
case class ProductRecs(productId: Int, recs:Seq[Recommendation])
object ItemCFRecommender {
// 同现相似度计算公式
// 比如:对A评分的人数100,对B评分的人数100,交集人数20
// 同现相似度:20 / 100 = 0.2
def cooccurrence(numOfRatersForAAndB: Long, numOfRatersForA: Long, numOfRatersForB: Long): Double = {
numOfRatersForAAndB / math.sqrt(numOfRatersForA * numOfRatersForB)
}
val MONGODB_PRODUCT_COLLECTION = "Products"
val MONGODB_RATING_COLLECTION = "Rating"
val PRODUCT_RECS = "ItemCFProductRecs"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "reommender"
)
//创建一个SparkConf配置
val sparkConf = new SparkConf().setAppName("ItemCFRecommender").setMaster(config("spark.cores")).set("spark.executor.memory","6G").set("spark.driver.memory","2G")
//基于SparkConf创建一个SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
//创建一个MongoDBConfig
val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
import spark.implicits._
// 读取mongoDB中的业务数据
val ratingDF = spark
.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Rating]
.rdd
.map {
rating => {
(rating.userId, rating.productId, rating.score)
}
}
.cache()
.toDF("userId", "productId", "rating")
val numRatersPerProduct = ratingDF.groupBy("productId").count().alias("nor")
// 在原记录基础上加上product的打分者的数量
val ratingsWithSize = ratingDF.join(numRatersPerProduct, "productId")
// 执行内联操作
val joinedDF = ratingsWithSize.join(ratingsWithSize, "userId")
.toDF("userId", "product1", "rating1", "nor1", "product2", "rating2", "nor2")
joinedDF
.selectExpr("userId", "product1", "nor1", "product2", "nor2")
.createOrReplaceTempView("joined")
// 计算必要的中间数据,注意此处有WHERE限定,只计算了一半的数据量
val sparseMatrix = spark.sql(
"""
|SELECT product1
|, product2
|, count(userId) as size
|, first(nor1) as nor1
|, first(nor2) as nor2
|FROM joined
|GROUP BY product1, product2
""".stripMargin)
.cache()
// 计算物品相似度
var sim = sparseMatrix.map(row => {
val size = row.getAs[Long](2)
val numRaters1 = row.getAs[Long](3)
val numRaters2 = row.getAs[Long](4)
val cooc = cooccurrence(size, numRaters1, numRaters2)
(row.getInt(0), row.getInt(1), cooc)
}).toDF("productId_01", "productId_02", "cooc")
val simDF = sim
.map{
case row => (
row.getAs[Int]("productId_01"),
row.getAs[Int]("productId_02"),
row.getAs[Double]("cooc")
)
}
.rdd
.map(
x => (x._1, (x._2, x._3))
)
.groupByKey()
.map {
case (productId, items) => ProductRecs(productId, items.toList.filter(x => x._1 != productId).sortWith(_._2 > _._2).map(x => Recommendation(x._1,x._2)).take(5))
}
.toDF()
simDF
.write
.option("uri", mongoConfig.uri)
.option("collection", PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
//关闭Spark
spark.close()
}
}
别忘了添加依赖:
<dependencies>
<!-- 引入Spark相关的Jar包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>2.1.1</version>
<!-- provider如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<!-- 加入MongoDB的驱动 -->
<!-- 用于代码方式连接MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于Spark和MongoDB的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
<!-- 引入Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
</dependency>
</dependencies>
5.13. 附记
-
DataLoader用来将csv文件中的数据导入MongoDB.
-
StatisticRecommender统计一些热门商品和评分最多的商品,用来解决冷启动问题。显示在首页的`热门推荐`和`评分最多`标签下。
-
OfflineRecommender: ALS矩阵分解算法。
-
UserRecs: 矩阵分解再相乘得到的预测评分矩阵降序排列,可以对用户进行推荐。显示在首页的`离线推荐`标签下。注意:注册完用户以后,需要完成评分操作,再运行OfflineRecommender.scala.
-
ProductRecs: 使用物品隐特征矩阵来两两计算余弦相似度。
-
-
ItemCFRecommender: 使用同现相似度来计算物品之间的相似性。显示在商品详情页的`基于物品的协同过滤`标签下。
-
ContentBasedRecommender: 基于商品的标签的TFIDF权重向量,来计算余弦相似度。显示在商品详情页的`基于内容的推荐`标签下。
-
StreamingRecommender: 使用的物品之间相似度是使用的物品隐特征矩阵算出来的余弦相似度。然后和Redis里面保存的最近评分进行融合。显示在`实时推荐`下。
-
KafkaStream模块用来从topic=lo消费数据,并发送到topic=recommender中。
5.14. 算法使用场景:
-
ItemCF: 适用于电商电影类网站。
-
ContentBased: 适用于新闻类网站。
-
ALS: 都可以,矩阵分解主要用来做降维。
6. 用Python从头实现神经网络
6.1. 砖块:神经元
首先让我们看看神经网络的基本单位,神经元。神经元接受输入,对其做一些数据操作,然后产生输出。例如,这是一个2-输入神经元:
这里发生了三个事情。首先,每个输入都跟一个权重相乘(红色):
然后,加权后的输入求和,加上一个偏差b(绿色):
最后,这个结果传递给一个激活函数f:
激活函数的用途是将一个无边界的输入,转变成一个可预测的形式。常用的激活函数就就是S型函数:
S型函数的值域是(0, 1)。简单来说,就是把(−∞, +∞)压缩到(0, 1) ,很大的负数约等于0,很大的正数约等于1。
6.2. 一个简单的例子
假设我们有一个神经元,激活函数就是S型函数,其参数如下:
w=[0, 1] 就是以向量的形式表示w_1=0, w_2=1。现在,我们给这个神经元一个输入x=[2, 3]。我们用点积来表示:
当输入是[2, 3]时,这个神经元的输出是0.999。给定输入,得到输出的过程被称为前馈(feedforward)。
6.3. 编码一个神经元
让我们来实现一个神经元!用Python的NumPy库来完成其中的数学计算:
import numpy as np
def sigmoid(x):
# Our activation function: f(x) = 1 / (1 + e^(-x))
return 1 / (1 + np.exp(-x))
class Neuron:
def __init__(self, weights, bias):
self.weights = weights
self.bias = bias
def feedforward(self, inputs):
# Weight inputs, add bias, then use the activation function
total = np.dot(self.weights, inputs) + self.bias
return sigmoid(total)
weights = np.array([0, 1]) # w1 = 0, w2 = 1
bias = 4 # b = 4
n = Neuron(weights, bias)
x = np.array([2, 3]) # x1 = 2, x2 = 3
print(n.feedforward(x)) # 0.9990889488055994
还记得这个数字吗?就是我们前面算出来的例子中的0.999。
6.4. 把神经元组装成网络
所谓的神经网络就是一堆神经元。这就是一个简单的神经网络:
这个网络有两个输入,一个有两个神经元(\(h_1\)和\(h_2\))的隐藏层,以及一个有一个神经元(\(o_1\))的输出层。要注意,\(o_1\)的输入就是\(h_1\)和\(h_2\)的输出,这样就组成了一个网络。
隐藏层就是输入层和输出层之间的层,隐藏层可以是多层的。 |
6.5. 例子:前馈
我们继续用前面图中的网络,假设每个神经元的权重都是\(w=[0,1\)],截距项也相同\(b=0\),激活函数也都是S型函数。分别用\(h_1, h_2, o_1\)表示相应的神经元的输出。
当输入\(x=[2,3\)]时,会得到什么结果?
这个神经网络对输入[2,3]的输出是0.7216,很简单。
一个神经网络的层数以及每一层中的神经元数量都是任意的。基本逻辑都一样:输入在神经网络中向前传输,最终得到输出。接下来,我们会继续使用前面的这个网络。
6.6. 编码神经网络:前馈
接下来我们实现这个神经网络的前馈机制,还是这个图:
import numpy as np
# ... code from previous section here
class OurNeuralNetwork:
'''
A neural network with:
- 2 inputs
- a hidden layer with 2 neurons (h1, h2)
- an output layer with 1 neuron (o1)
Each neuron has the same weights and bias:
- w = [0, 1]
- b = 0
'''
def __init__(self):
weights = np.array([0, 1])
bias = 0
# The Neuron class here is from the previous section
self.h1 = Neuron(weights, bias)
self.h2 = Neuron(weights, bias)
self.o1 = Neuron(weights, bias)
def feedforward(self, x):
out_h1 = self.h1.feedforward(x)
out_h2 = self.h2.feedforward(x)
# The inputs for o1 are the outputs from h1 and h2
out_o1 = self.o1.feedforward(np.array([out_h1, out_h2]))
return out_o1
network = OurNeuralNetwork()
x = np.array([2, 3])
print(network.feedforward(x)) # 0.7216325609518421
结果正确,看上去没问题。
6.7. 训练神经网络,第1部分
现在有这样的数据:
Name | Weight | Height | Gender |
---|---|---|---|
Alice |
133 |
65 |
F |
Bob |
160 |
72 |
M |
Charlie |
152 |
70 |
M |
Diana |
120 |
60 |
F |
接下来我们用这个数据来训练神经网络的权重和截距项,从而可以根据身高体重预测性别:
我们用0和1分别表示男性(M)和女性(F),并对数值做了转化:
Name | Weight | Height | Gender |
---|---|---|---|
Alice |
-2 |
-1 |
1 |
Bob |
25 |
6 |
0 |
Charlie |
17 |
4 |
0 |
Diana |
-15 |
-6 |
1 |
我这里是随意选取了135和66来标准化数据,通常会使用平均值。 |
6.8. 损失
在训练网络之前,我们需要量化当前的网络是『好』还是『坏』,从而可以寻找更好的网络。这就是定义损失的目的。
我们在这里用平均方差(MSE)损失: \(MSE = \frac{1}{n}\sum_{i=1}^{n}(y_{true} - y_{pred})^2\) 让我们仔细看看:
-
n是样品数,这里等于4(Alice、Bob、Charlie和Diana)。
-
y表示要预测的变量,这里是性别。
-
\(y_{true}\) 是变量的真实值(『正确答案』)。例如,Alice的 \(y_{true}\) 就是1(男性)。
-
\(y_{pred}\) 是变量的预测值。这就是我们网络的输出。
\((y_{true}-y_{pred})^2\)被称为方差(squared error)。我们的损失函数就是所有方差的平均值。预测效果于浩,损失就越少。
更好的预测 = 更少的损失!
训练网络 = 最小化它的损失。
6.9. 损失计算例子
假设我们的网络总是输出0,换言之就是认为所有人都是男性。损失如何?
Name | \(y_{true}\) | \(y_{pred}\) | \(y_{true}-y_{pred}\) |
---|---|---|---|
Alice |
1 |
0 |
1 |
Bob |
0 |
0 |
0 |
Charlie |
0 |
0 |
0 |
Diana |
1 |
0 |
1 |
6.10. 代码:MSE损失
下面是计算MSE损失的代码:
import numpy as np
def mse_loss(y_true, y_pred):
# y_true and y_pred are numpy arrays of the same length.
return ((y_true - y_pred) ** 2).mean()
y_true = np.array([1, 0, 0, 1])
y_pred = np.array([0, 0, 0, 0])
print(mse_loss(y_true, y_pred)) # 0.5
6.11. 训练神经网络,第2部分
现在我们有了一个明确的目标:最小化神经网络的损失。通过调整网络的权重和截距项,我们可以改变其预测结果,但如何才能逐步地减少损失?
为了简化问题,假设我们的数据集中只有Alice:
Name | \(y_{true}\) | \(y_{pred}\) | \((y_{true}-y_{pred})^2\) |
---|---|---|---|
Alice |
1 |
0 |
1 |
那均方差损失就只是Alice的方差:
也可以把损失看成是权重和截距项的函数。让我们给网络标上权重和截距项:
这样我们就可以把网络的损失表示为:
假设我们要优化\(w_1\),当我们改变\(w_1\)时,损失\(L\)会怎么变化?可以用\(\frac{\partial L}{\partial w_1}\)来回答这个问题,怎么计算?
接下来的数据稍微有点复杂,别担心,准备好纸和笔。 |
首先,让我们用\(\frac{\partial y_{pred}}{\partial w_1}\)来改写这个偏导数:
因为我们已经知道 \(L=(1-y_{pred})^2\) ,所以我们可以计算 \(\frac{\partial L}{\partial y_{pred}}\) :
现在让我们来搞定\(\frac{\partial y_{pred}}{\partial w_1}\)。\(h_1,h_2,o_1\)分别是其所表示的神经元的输出,我们有:
由于\(w_1\)只会影响\(h_1\)(不会影响\(h_2\)),所以:
对\(\frac{\partial h_1}{\partial w_1}\),我们也可以这么做:
在这里,\(x_1\) 是身高,\(x_2\)是体重。这是我们第二次看到\(f'(x)\)(S型函数的导数)了。求解:
稍后我们会用到这个\(f'(x)\)。
我们已经把\(\frac{\partial L}{\partial w_1}\)分解成了几个我们能计算的部分:
这种计算偏导的方法叫『反向传播算法』(backpropagation)。
好多数学符号,如果你还没搞明白的话,我们来看一个实际例子。
6.12. 例子:计算偏导数
我们还是看数据集中只有Alice的情况:
Name | \(y_{true}\) | \(y_{pred}\) | \((y_{true}-y_{pred})^2\) |
---|---|---|---|
Alice |
1 |
0 |
1 |
把所有的权重和截距项都分别初始化为1和0。在网络中做前馈计算:
网络的输出是\(y_{pred}=0.524\),对于Male(0)或者Female(1)都没有太强的倾向性。算一下\(\frac{\partial L}{\partial w_1}\):
提示:前面已经得到了S型激活函数的导数\(f'(x)=f(x)*(1-f(x))\)。 |
搞定!这个结果的意思就是增加\(w_1\),\(L\)也会随之轻微上升。
6.13. 训练:随机梯度下降
现在训练神经网络已经万事俱备了!我们会使用名为随机梯度下降法的优化算法来优化网络的权重和截距项,实现损失的最小化。核心就是这个更新等式:
\(\eta\)是一个常数,被称为学习率,用于调整训练的速度。我们要做的就是用\(w_1\)减去\(\eta\frac{\partial L}{\partial w_1}\):
-
如果\(\frac{\partial L}{\partial w_1}\)是正数,\(w_1\)会变小,\(L\)会下降。
-
如果\(\frac{\partial L}{\partial w_1}\)是负数,\(w_1\)会变大,\(L\)会上升。
如果我们对网络中的每个权重和截距项都这样进行优化,损失就会不断下降,网络性能会不断上升。
我们的训练过程是这样的:
-
从我们的数据集中选择一个样本,用随机梯度下降法进行优化——每次我们都只针对一个样本进行优化;
-
计算每个权重或截距项对损失的偏导(例如\(\frac{\partial L}{\partial w_1}\)、\(\frac{\partial L}{\partial w_2}\)等);
-
用更新等式更新每个权重和截距项;
-
重复第一步;
6.14. 代码:一个完整的神经网络
我们终于可以实现一个完整的神经网络了:
Name | Weight | Height | Gender |
---|---|---|---|
Alice |
-2 |
-1 |
1 |
Bob |
25 |
6 |
0 |
Charlie |
17 |
4 |
0 |
Diana |
-15 |
-6 |
1 |
import numpy as np
def sigmoid(x):
# Sigmoid activation function: f(x) = 1 / (1 + e^(-x))
return 1 / (1 + np.exp(-x))
def deriv_sigmoid(x):
# Derivative of sigmoid: f'(x) = f(x) * (1 - f(x))
fx = sigmoid(x)
return fx * (1 - fx)
def mse_loss(y_true, y_pred):
# y_true and y_pred are numpy arrays of the same length.
return ((y_true - y_pred) ** 2).mean()
class OurNeuralNetwork:
'''
A neural network with:
- 2 inputs
- a hidden layer with 2 neurons (h1, h2)
- an output layer with 1 neuron (o1)
*** DISCLAIMER ***:
The code below is intended to be simple and educational, NOT optimal.
Real neural net code looks nothing like this. DO NOT use this code.
Instead, read/run it to understand how this specific network works.
'''
def __init__(self):
# Weights
self.w1 = np.random.normal()
self.w2 = np.random.normal()
self.w3 = np.random.normal()
self.w4 = np.random.normal()
self.w5 = np.random.normal()
self.w6 = np.random.normal()
# Biases
self.b1 = np.random.normal()
self.b2 = np.random.normal()
self.b3 = np.random.normal()
def feedforward(self, x):
# x is a numpy array with 2 elements.
h1 = sigmoid(self.w1 * x[0] + self.w2 * x[1] + self.b1)
h2 = sigmoid(self.w3 * x[0] + self.w4 * x[1] + self.b2)
o1 = sigmoid(self.w5 * h1 + self.w6 * h2 + self.b3)
return o1
def train(self, data, all_y_trues):
'''
- data is a (n x 2) numpy array, n = # of samples in the dataset.
- all_y_trues is a numpy array with n elements.
Elements in all_y_trues correspond to those in data.
'''
learn_rate = 0.1
epochs = 1000 # number of times to loop through the entire dataset
for epoch in range(epochs):
for x, y_true in zip(data, all_y_trues):
# --- Do a feedforward (we'll need these values later)
sum_h1 = self.w1 * x[0] + self.w2 * x[1] + self.b1
h1 = sigmoid(sum_h1)
sum_h2 = self.w3 * x[0] + self.w4 * x[1] + self.b2
h2 = sigmoid(sum_h2)
sum_o1 = self.w5 * h1 + self.w6 * h2 + self.b3
o1 = sigmoid(sum_o1)
y_pred = o1
# --- Calculate partial derivatives.
# --- Naming: d_L_d_w1 represents "partial L / partial w1"
d_L_d_ypred = -2 * (y_true - y_pred)
# Neuron o1
d_ypred_d_w5 = h1 * deriv_sigmoid(sum_o1)
d_ypred_d_w6 = h2 * deriv_sigmoid(sum_o1)
d_ypred_d_b3 = deriv_sigmoid(sum_o1)
d_ypred_d_h1 = self.w5 * deriv_sigmoid(sum_o1)
d_ypred_d_h2 = self.w6 * deriv_sigmoid(sum_o1)
# Neuron h1
d_h1_d_w1 = x[0] * deriv_sigmoid(sum_h1)
d_h1_d_w2 = x[1] * deriv_sigmoid(sum_h1)
d_h1_d_b1 = deriv_sigmoid(sum_h1)
# Neuron h2
d_h2_d_w3 = x[0] * deriv_sigmoid(sum_h2)
d_h2_d_w4 = x[1] * deriv_sigmoid(sum_h2)
d_h2_d_b2 = deriv_sigmoid(sum_h2)
# --- Update weights and biases
# Neuron h1
self.w1 -= learn_rate * d_L_d_ypred * d_ypred_d_h1 * d_h1_d_w1
self.w2 -= learn_rate * d_L_d_ypred * d_ypred_d_h1 * d_h1_d_w2
self.b1 -= learn_rate * d_L_d_ypred * d_ypred_d_h1 * d_h1_d_b1
# Neuron h2
self.w3 -= learn_rate * d_L_d_ypred * d_ypred_d_h2 * d_h2_d_w3
self.w4 -= learn_rate * d_L_d_ypred * d_ypred_d_h2 * d_h2_d_w4
self.b2 -= learn_rate * d_L_d_ypred * d_ypred_d_h2 * d_h2_d_b2
# Neuron o1
self.w5 -= learn_rate * d_L_d_ypred * d_ypred_d_w5
self.w6 -= learn_rate * d_L_d_ypred * d_ypred_d_w6
self.b3 -= learn_rate * d_L_d_ypred * d_ypred_d_b3
# --- Calculate total loss at the end of each epoch
if epoch % 10 == 0:
y_preds = np.apply_along_axis(self.feedforward, 1, data)
loss = mse_loss(all_y_trues, y_preds)
print("Epoch %d loss: %.3f" % (epoch, loss))
# Define dataset
data = np.array([
[-2, -1], # Alice
[25, 6], # Bob
[17, 4], # Charlie
[-15, -6], # Diana
])
all_y_trues = np.array([
1, # Alice
0, # Bob
0, # Charlie
1, # Diana
])
# Train our neural network!
network = OurNeuralNetwork()
network.train(data, all_y_trues)
随着网络的学习,损失在稳步下降。
现在我们可以用这个网络来预测性别了:
# Make some predictions
emily = np.array([-7, -3]) # 128 pounds, 63 inches
frank = np.array([20, 2]) # 155 pounds, 68 inches
print("Emily: %.3f" % network.feedforward(emily)) # 0.951 - F
print("Frank: %.3f" % network.feedforward(frank)) # 0.039 - M
6.15. 接下来?
搞定了一个简单的神经网络,快速回顾一下:
-
介绍了神经网络的基本结构——神经元;
-
在神经元中使用S型激活函数;
-
神经网络就是连接在一起的神经元;
-
构建了一个数据集,输入(或特征)是体重和身高,输出(或标签)是性别;
-
学习了损失函数和均方差损失;
-
训练网络就是最小化其损失;
-
用反向传播方法计算偏导;
-
用随机梯度下降法训练网络;
接下来你还可以:
-
用机器学习库实现更大更好的神经网络,例如TensorFlow、Keras和PyTorch;
-
在浏览器中实现神经网络;
-
其他类型的激活函数;
-
其他类型的优化器;
-
学习卷积神经网络,这给计算机视觉领域带来了革命;
-
学习递归神经网络,常用语自然语言处理;
6.16. TensorFlow版本
import tensorflow as tf
import numpy as np
data = np.array([
[-2.0, -1], # Alice
[25, 6], # Bob
[17, 4], # Charlie
[-15, -6], # Diana
])
all_y_trues = np.array([
1, # Alice
0, # Bob
0, # Charlie
1, # Diana
])
inputs = tf.keras.Input(shape=(2,))
x = tf.keras.layers.Dense(2, use_bias=True)(inputs)
outputs = tf.keras.layers.Dense(1, use_bias=True, activation='sigmoid')(x)
m = tf.keras.Model(inputs, outputs)
m.compile(tf.keras.optimizers.SGD(learning_rate=0.1), 'mse')
m.fit(data, all_y_trues, epochs=1000, batch_size=1, verbose=0)
emily = np.array([[-7, -3]])
frank = np.array([[20, 2]])
print(m.predict(emily))
print(m.predict(frank))
7. 神经网络BP反向传播算法原理和详细推导流程
7.1. 反向传播算法和BP网络简介
误差反向传播算法简称反向传播算法(即BP算法)。使用反向传播算法的多层感知器又称为BP神经网络。BP算法是一个迭代算法,它的基本思想为:
-
先计算每一层的状态和激活值,直到最后一层(即信号是前向传播的);
-
计算每一层的误差,误差的计算过程是从最后一层向前推进的(这就是反向传播算法名字的由来);
-
更新参数(目标是误差变小)。迭代前面两个步骤,直到满足停止准则(比如相邻两次迭代的误差的差别很小)。
本文的记号说明:
-
\(n_l\)表示第\(l\)层神经元的个数;
-
\(f(\cdot)\)表示神经元的激活函数;
-
\(W^{(l)} \in \mathbb{R}^{n_l \times n_{l-1}}\)表示第\(l-1\)层到第\(l\)层的权重矩阵;
-
\(w_{ij}^{(l)}\)是权重矩阵\(W^{(l)}\)中的元素,表示第\(l-1\)层第\(j\)个神经元到第\(l\)层第\(i\)个神经元的连接的权重(注意标号的顺序);
-
\(\textbf{b}^{(l)}=(b_1^{(l)},b_2^{(l)},\dots,b_{n_l}^{(l)})^T \in \mathbb{R}^{n_l}\)表示\(l-1\)层到第\(l\)层的偏置;
-
\(\textbf{z}^{(l)}=(z_1^{(l)},z_2^{(l)},\dots,z_{n_l}^{(l)})^T \in \mathbb{R}^{n_l}\)表示\(l\)层神经元的状态;
-
\(\textbf{a}^{(l)}=(a_1^{(l)},a_2^{(l)},\dots,a_{n_l}^{(l)})^T \in \mathbb{R}^{n_l}\)表示\(l\)层神经元的激活值(即输出值);
关于记号的特别注意:不同的文献所采用的记号可能不同,这将导致不同文献的公式结论可能不同。如Andrew Ng的教程中的\(W^{(l)}\)表示的是第\(l\)层到第\(l+1\)层的权重矩阵。又如,本文用“下标”来标记一个向量的不同分量,而有一些资料却用“上标”来标记向量的不同分量。
下面以三层感知器(即只含有一个隐藏层的多层感知器)为例介绍“反向传播算法(BP算法)”。
三层感知机如图1所示。例子中,输入数据\(\textbf{x}=(x_1,x_2,x_3)^T\)是3维的(对于第一层,可以认为\(a_i^{(1)}=x_i\)),唯一的隐藏层有3个节点,输出数据是2维的。
7.2. 信息前向传播
显然,图1所示神经网络的第2层神经元的状态及激活值可以通过下面的计算得到:
类似的,第3层神经元的状态及激活值可以通过下面的计算得到:
可以总结出,第\(l(2 \le l \le L)\)层神经元的状态及激活值为(下面式子是向量表示形式):
对于\(L\)层感知机,网络的最终输出为\(\textbf{a}^{(L)}\)。前馈神经网络中信息的前向传递过程如下:
7.3. 误差反向传播
“信息向前传播”讲的是已知各个神经元的参数后,如何得到神经网络的输出。但怎么得到各个神经元的参数呢?“误差反向传播”算法解决的就是这个问题。
假设训练数据为\({(x^{(1)},y^{(1)}),(x^{(2)},y^{(2)}),\dots,(x^{(i)},y^{(i)}),\dots,(x^{(N)},y^{(N)})}\),即共有\(N\)个。又假设输出数据为\(n_L\)维的,即\(\textbf{y}^{(i)}=(y_1^{(i)},\dots,y_{n_L}^{(i)})^T\)。
对某一个训练数据\((\textbf{x}^{(i)},\textbf{y}^{(i)})\)来说,其代价函数可写为:
说明1:\(\textbf{y}^{(i)}\)为期望的输出(是训练数据给出的已知值),\(\textbf{o}^{(i)}\)为神经网络对输入\(\textbf{x}^{(i)}\)产生的实际输出。
说明2:代价函数中的系数\(\frac{1}{2}\)显然是不必要的,它的存在仅仅是为了后续计算时更方便。
说明3:以图1所示的神经网络为例子,\(n_L=2,\textbf{y}^{(i)}=(y_1^{(i)},y_2^{(i)})^T\),从而有\(E_{(i)}=\frac{1}{2}(y_1^{(i)}-a_1^{(3)})^2+\frac{1}{2}(y_2^{(i)}-a_2^{(3)})^2\),如果展开到隐藏层,则有\(E_{(i)}=\frac{1}{2}(y_1^{(i)}-f(w_{11}^{(3)}a_1^{(2)}+w_{12}^{(3)}a_2^{(2)}+w_{13}^{(3)}a_3^{(2)}+b_1^{(3)}))^2 + \frac{1}{2}(y_2^{(i)}-f(w_{21}^{(3)}a_1^{(2)}+w_{22}^{(3)}a_2^{(2)}+w_{23}^{(3)}a_3^{(2)}+b_2^{(3)}))^2\),还可以进一步展开到输入层(替换掉\(a_1^{(2)},a_2^{(2)},a_3^{(2)}\)即可),最后可得:代价函数\(E_{(i)}\)仅和权重矩阵\(W^{(l)}\)和偏置向量\(b^{(l)}\)相关,调整权重和偏置可以减少或增大代价(误差)。
显然,所有训练数据的总体(平均)代价可写为:
我们的目标就是调整权重和偏置使总体代价(误差)变小,求得总体代价取最小值时对应的各个神经元的参数(即权重和偏置)。
如果采用梯度下降法(在这里,又可称为“批量梯度下降法”),可以用下面公式更新参数\(w_{ij}^{(l)},b_i^{(l)},2 \le l \le L\):
由上面公式可知,只需求得每一个训练数据的代价函数\(E_{(i)}\)对参数的偏导数\(\frac{\partial E_{(i)}}{\partial W^{(l)}},\frac{\partial E_{(i)}}{\partial b^{(l)}}\)即可得到参数的迭代更新公式。
为简单起见,在下文的推导中,我们去掉\(E_{(i)}\)的下标,直接记为E(要理解它是单个训练数据的误差)。
下面将介绍用“反向传播算法”求解单个训练数据误差对参数的偏导数\(\frac{\partial E}{\partial W^{(l)}}\)和\(\frac{\partial E}{\partial b^{(l)}}\)的过程。我们求解一个简单的情况:图1所示神经网络,最后再归纳出通用公式。
7.3.1. 输出层的权重参数更新
把\(E\)展开到隐藏层,有:
由求导的链式法则,对“输出层神经元的权重参数”求偏导,有: