10  时间换空间:大数据流式计算

在当今信息爆炸的时代,数据量的迅速增长让传统的批处理方法显得越来越力不从心。为了应对不断涌现的大数据,我们需要一种更加高效和灵活的处理方式。流式计算应运而生,它通过实时处理数据流,能够在数据生成的瞬间进行计算和分析,从而大大提高了数据处理的效率和响应速度。在本章中,我们将深入探讨流式计算的基本概念、核心技术以及如何在R语言中实现流式分块计算,帮助你在面对海量数据时,能够更加从容应对,实现真正的“时间换空间”。

10.1 基本思想

在计算机内存有限的情况下,处理大型文件的一种有效方法是文件的分块操作。分块操作是将大型文件分割成多个较小的块,逐块进行处理,而不是一次性将整个文件加载到内存中。具体步骤如下:

  1. 读取文件的一部分:利用文件指针或分块读取方法,逐块读取文件中的数据到内存中。
  2. 处理当前块:对读入内存的这一部分数据进行处理,例如计算统计信息、数据转换等。
  3. 保存处理结果:将处理后的数据或中间结果保存到磁盘或汇总到一个结果集合中。
  4. 继续读取下一块:重复上述过程,直到文件的所有部分都处理完毕。

这种分块操作方法可以有效利用有限的内存资源,避免内存溢出,适用于需要处理大规模数据的场景,如数据分析、机器学习预处理和日志处理等。下面我们将以R语言为例,介绍如何在R中完成大数据流式计算(主要使用分块方法),针对不同的文件类型(如csv、fst等),我们会给出不同的方案。

10.2 csv分块计算

如果我们现在有一份4G的csv文件需要处理,但是我们的计算机内存只有2G,在不扩容的基础上,我们无法一次性把整个文件载入到环境中,那么就必须分块计算。在R语言中,最佳的csv分块计算可以使用readr包的read_csv_chunked函数进行实现。函数中除了指定文件所在路径外,至少还需要设置两个参数:1、callback:对每次分块要做什么处理;2、chunk_size:每次分块处理多少行数据。下面我们用官方文档的例子进行演示:

# 定义每次分块的处理
f <- function(x, pos) subset(x, gear == 3) # 筛选出gear为3的行

# 执行分块读取
read_csv_chunked(
  readr_example("mtcars.csv"),  # 文件所在路径
  DataFrameCallback$new(f),  # 设置每次读取后的操作
  chunk_size = 5) # 设置每次读5行

需要注意的是,在定义函数的时候,我们总是需要在函数中设定两个参数,其中x是分块的数据框,而pos是位置,我们只需要放在那里即可。在上面的操作中,我们相当于是把符合一定条件的行筛选出来,只要筛选出来的数据小于我们的最大内存,那么我们就可以成功进行操作。

除了分块读取之外,其实我们在写出csv的时候,也可以使用readr包中的write_csv函数进行分块写出操作,只需要把append参数设置为TRUE即可。

10.3 fst分块计算

我们在前面的章节中提到了可以使用fst文件格式对数据框进行快速的存取,事实上,fst还支持随机访问(Random access)。这就意味着,我们可以通过制定要读取数据的位置来定向获取文件的部分数据,这一特性为分块计算创造了有利条件。以下是笔者自定义的函数,能够对任意的fst文件进行分块计算:

library(pacman)
p_load(fst,data.table)

import_fst_chunked = \(path,chunk_size = 1e4,chunk_f = identity,combine_f = rbindlist){
  # 计算文件总行数
  parse_fst(path) %>% nrow() -> ft_nrow
  
  # 计算分块起始位置
  seq(from = 1,to = ft_nrow,by = chunk_size)-> start_index
  
  # 计算分块终结位置
  c(start_index[-1] - 1,ft_nrow) -> end_index
  
  # 执行分块计算操作
  Map(f = \(s,e){
    read_fst(path,from = s,to = e,as.data.table = TRUE) %>%  # 对数据进行分块读取
      chunk_f # 对每个分块要进行的计算
  },start_index,end_index) %>% 
    combine_f # 对分块计算结果进行汇总
}

在上面定义的函数中,path代表文件所在路径,chunk_size代表每块读入行数(默认值为1万行),chunk_f是需要对每个块进行的函数操作(默认为返回数据库本身,但是一般来说在分块计算后应该获得的结果要比原来的数据量少得多才有意义),而combine_f则负责对每个分块最后获得的结果进行汇总(获得的结果是一个装着所有分块结果的列表,因此需要对列表中的所有元素进行合并)。

下面,我们会应用这个函数把文件中Integer为7的记录都筛选出来,操作方法如下:

# 构造数据框
nr_of_rows <- 1e7 # 构造1千万行数据
df <- data.frame(
  Logical = sample(c(TRUE, FALSE, NA), prob = c(0.85, 0.1, 0.05), nr_of_rows, replace = TRUE),
  Integer = sample(1L:100L, nr_of_rows, replace = TRUE),
  Real = sample(sample(1:10000, 20) / 100, nr_of_rows, replace = TRUE),
  Factor = as.factor(sample(labels(UScitiesD), nr_of_rows, replace = TRUE))
)

# 写出fst文件
fst_file <- tempfile(fileext = ".fst")
write_fst(df, fst_file)

# 分块筛选出Integer为7的记录,赋值给res
res = import_fst_chunked(fst_file,chunk_f = \(x) x[Integer==7])

这里我们仅仅是进行了筛选操作,如果操作是可以分开执行并汇总的,理论上都可以使用该函数进行实现,比如求和、求最大值、求最小值等。

10.4 数据库分块计算

有的时候,我们的数据保存在数据库中,需要导入到R环境中进行处理,那么我们就需要从数据库中分块导出,然后每次导出的时候进行处理并保存结果,最后再进行结果的汇总统计。这里我们还是以SQLite数据库为例,看看如何对数据库中的结果进行分块。首先我们先配置好环境:

# 加载包
library(RSQLite)

# 建立临时数据库
mydb <- dbConnect(RSQLite::SQLite(), "")

# 写出mtcars表格到数据库中
dbWriteTable(mydb, "mtcars", mtcars)

然后,我们尝试对存入的数据分块进行读出,并显示每一块的行数量是多少:

# 记录查询
rs <- dbSendQuery(mydb, 'SELECT * FROM mtcars')

# 分块处理
while (!dbHasCompleted(rs)) { # 当查询没结束的时候,继续执行循环
  df <- dbFetch(rs, n = 10)  # 每次取出查询的10条记录
  print(nrow(df))  # 打印查询记录的行数量
}
#> [1] 10
#> [1] 10
#> [1] 10
#> [1] 2

我们可以看到,使用dbSendQuery函数不会马上执行,只有采用dbFetch函数的时候才会将数据取出,而其中的n参数可以控制每次取出块的大小。这样,我们就可以对数据库中的数据进行分块操作了。

10.5 小结

在本章中,我们探讨了单机内存有限条件下如何突破空间限制进行大数据分析。对于大于内存的数据,只能每次从文件中提取小于计算机内存的数据进行处理,然后最后再把每次分块的结果汇总到一起。这样,原来有限内存无法解决的问题,就可以通过步步为营的思路,循序渐进地解决掉。这种方法会花费更多的时间,但是会让本来有限内存无法完成的数据处理变为可能。另一种情况下,很多文件可能本身就是分块存储的(比如一个大数据可以按照某一列进行分组,存成多个小文件),这样我们就可以先对每一个块的数据进行处理,然后再对处理结果进行汇总。

10.6 练习

  • 生成一个比本机内存要大的数据框,然后对数据进行求最大值和最小值的操作。在生成过程中,尝试使用readr::write_csv函数,再读取处理过程中尝试使用read_csv_chunked函数。