实时流量统计

  • 基本需求
    • 从web服务器的日志中,统计实时的访问流量
    • 统计每分钟的ip访问量,取出访问量最大的5个地址,每5秒更新一次
  • 解决思路
    • 将apache服务器日志中的时间,转换为时间戳,作为Event Time
    • 构建滑动窗口,窗口长度为1分钟,滑动距离为5秒

数据准备

将apache服务器的日志文件apache.log复制到资源文件目录src/main/resources下,我们将从这里读取数据。

代码实现

我们现在要实现的模块是“实时流量统计”。对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从web服务器的日志中提取出来。我们在这里实现最基本的“页面浏览数”的统计,也就是读取服务器日志中的每一行log,统计在一段时间内用户访问url的次数。

具体做法为:每隔5秒,输出最近10分钟内访问量最多的前N个URL。可以看出,这个需求与之前“实时热门商品统计”非常类似,所以我们完全可以借鉴此前的代码。

完整代码如下:

object ApacheLogAnalysis {

  case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)

  case class UrlViewCount(url: String, windowEnd: Long, count: Long)

  def main(args: Array[String]): Unit = {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val stream = env
      // 文件的绝对路径
      .readTextFile("apache.log的绝对路径")
      .map(line => {
        val linearray = line.split(" ")
        // 把时间戳ETL成毫秒
        val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
        val timestamp = simpleDateFormat.parse(linearray(3)).getTime
        ApacheLogEvent(linearray(0),
                       linearray(2),
                       timestamp,
                       linearray(5),
                       linearray(6))
      })
      .assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](
          Time.milliseconds(1000)
        ) {
          override def extractTimestamp(t: ApacheLogEvent): Long = {
            t.eventTime
          }
        }
      )
      .keyBy(_.url)
      .timeWindow(Time.minutes(10), Time.seconds(5))
      .aggregate(new CountAgg(), new WindowResultFunction())
      .keyBy(_.windowEnd)
      .process(new TopNHotUrls(5))
      .print()

    env.execute("Traffic Analysis Job")
  }

  class CountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] {
    override def createAccumulator(): Long = 0L
    override def add(apacheLogEvent: ApacheLogEvent, acc: Long): Long = acc + 1
    override def getResult(acc: Long): Long = acc
    override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
  }

  class WindowResultFunction
    extends ProcessWindowFunction[Long, UrlViewCount, String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
      out.collect(UrlViewCount(key, context.window.getEnd, elements.iterator.next()))
    }
  }

  class TopNHotUrls(topSize: Int)
    extends KeyedProcessFunction[Long, UrlViewCount, String] {

    lazy val urlState = getRuntimeContext.getListState(
      new ListStateDescriptor[UrlViewCount](
        "urlState-state",
        Types.of[UrlViewCount]
      )
    )

    override def processElement(
      input: UrlViewCount,
      context: KeyedProcessFunction[Long, UrlViewCount, String]#Context,
      collector: Collector[String]
    ): Unit = {
      // 每条数据都保存到状态中
      urlState.add(input)
      context
        .timerService
        .registerEventTimeTimer(input.windowEnd + 1)
    }

    override def onTimer(
      timestamp: Long,
      ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext,
      out: Collector[String]
    ): Unit = {
      // 获取收到的所有URL访问量
      val allUrlViews: ListBuffer[UrlViewCount] = ListBuffer()
      import scala.collection.JavaConversions._
      for (urlView <- urlState.get) {
        allUrlViews += urlView
      }
      // 提前清除状态中的数据,释放空间
      urlState.clear()
      // 按照访问量从大到小排序
      val sortedUrlViews = allUrlViews.sortBy(_.count)(Ordering.Long.reverse)
        .take(topSize)
      // 将排名信息格式化成 String, 便于打印
      var result: StringBuilder = new StringBuilder
      result
        .append("====================================\n")
        .append("时间: ")
        .append(new Timestamp(timestamp - 1))
        .append("\n")

      for (i <- sortedUrlViews.indices) {
        val currentUrlView: UrlViewCount = sortedUrlViews(i)
        // e.g.  No1:  URL=/blog/tags/firefox?flav=rss20  流量=55
        result
          .append("No")
          .append(i + 1)
          .append(": ")
          .append("  URL=")
          .append(currentUrlView.url)
          .append("  流量=")
          .append(currentUrlView.count)
          .append("\n")
      }
      result
        .append("====================================\n\n")
      // 控制输出频率,模拟实时滚动结果
      Thread.sleep(1000)
      out.collect(result.toString)
    }
  }
}