大数据Join指南-Python,SQL,Pandas,Spark,Dask

如何最佳地Join大型数据集-多种方法的指南

大数据Join指南-Python,SQL,Pandas,Spark,Dask

The Big Data — Photo by Patrick Lindenberg on Unsplash

旋转磁盘,鱼片或使用大数据有很多方法-这是快速指南。

样本数据集

Kaggle电影{ w 4 v据库[8]-25,000万条额定值,用于45,000部电影,数据分为5个文件:

大数据Join指南-Python,SQL,Pandas,Spark,Dask

Foo& 4 K 7 X v )stack.Ai

要查找电影的最高平均收视率,您需要将指向元数据的链接添加到收视率中:

SV k pELECT m.title, avg(r.rating) FROM links l INNE& F kR JOIN to metas m ON m.imdbId=l.imdbId INNERG * b } W = X ) JOIN to r^ W q c h ( z e satb h Z d Q ]ings ON r.movieId=l.movieId GROUP BY m.title HAVING count(r.rating) > 2 and avg(r.rating) > 4.5

老派SQL RDBMS

经典的方法是加载数据库,建立索引并运行前面提到的SQL(或使用下面的经典SQL):

SELECT m.title, avg(r7 U z.rating) FROM links l, metas m, ratings r WHERE m.imdbId=l.imdbId anj ] Z . n Ld r.movig q 1 -eId=l.movieId GROUP BY m.title HAVING_ - k d & q 7 A V count(r.rating) > 2 and avg(r.rating) > 4.5

RDBMS的加入通过3种主要方式进行,并带有一些平T t J p d g台变体:

嵌套循环-对表A的每一行查找表B中w k 4的匹配键。B上的索引使查找为O(A * log B),否则联接为SLOW-O(A * B)。

哈希联接—通过查找关键字构建表B的哈希/映射,使联接查找非常快— O(A * 1)

合并排序-对两个表进行排序并在一次通过中合并,除非预先排序,否则不会超快-O8 o k p w(A + B + A log A + B lB % t b K qog B)→O(A log A + B log B)

使用Sqlite3 [1]-创建数据库,表和加载非常容易:

import sqlite3
import csv

c? b k h K S i uon = sqlite3.connect(\'mydatabase.db\')
c = con.cursor()

# load fil; / d e int* D 4 . p Q x $o data array
with open(file, \'r\', encoding=\'utf83 T p G\') as csvW G v ifile:
csvreader = csvj { ] 0 V ` { t 9.reader(csvfile)
fields = next(csvreader) # strip header
for row in csvreader:
data.append(row)

# create table
c.execute(\"CREATE TABLE ratings(movieId text, rating float\")

# execute in batch, commit in 500\'s
c.execute(\"INSERT INTO ratings(movieId, rating) VALUES (?,?)\", dataJ j 0 { i k b V N)

# create indexes
c.exee 5 R S 7 U K }cute(\"create index ridx on ratings(movieId, rating)\")

加载和查询26g L C L H 6 { o :00万行数据大# 1 M w K $ N约需要10分钟的时间(也许我们可以组合调整前两个步骤..)

从csv /磁盘加载-35秒

插入数据库— 8分钟

添加索引— 30秒

按查询分组— 20秒

您还可以使用sqlite3命令y k Y m g 0行来测试和查看查询u V S j N执行计划,如下所示。 在联接列上添加一些额外的索引后,我们的目标查询大约需要21秒才能执行。

大数据Join指南-Python,SQL,Pandas,Spark,Dask

SQLite query execution (from* e } ! Y p cmdline tools)

使用SQL DB是可扩展的,但是比较老套。 接下来,我们将尝试行: 2 + v ^ `家技巧。

Pythl 3 L Q | ^ 0 con —适用于终极黑客

我们可以节省数据库开销,编写数据负载,并在Python中直接繁琐地进行Join:

# double nested loop join
def merge(links, ratings, mett h r q v mas):
merged = []
for link in linkP S d x f Xs[1]:
mlink = link.copy()
md * } D X a | {link += ([\'\',\'\']) # rating and nam) 4 w & / F T 1 Ye
for rating in ratings[1]:
if (mlink[0] == rating[1]):
mlik T % & O L {nk[3] = rating[2]
break
for meta in metas[1]:
if (mlinkv L K _[1] == meta[6]): # stripped tj 1 J ^ ( Vt off meta imdb columns FYI
mlink[4] = meta[20]
break
merged.append(mlink)
retA H 4 C v Kurn merged

\" merge()\"是一个没有索引的嵌套循环连接。 循环必须扫描metas和links表以获取每个等级(26m * 50k * 2)。 100k条评论需要5分钟,因此2600万条评论将需F , w @要很长时间...

\" merge_wmap()\"是一个哈希联接-我们为元数据和链接构建了一个Map映射,从而产生了O(n * 1)性能。_ ^ C _ t c W ( H 加入2600万行只需3秒!

# opG f B G / Etimized single loop + w/ hash lookups of ratings and meta3 g n k , ( 8data
def merge_wmap(links, ratings, m| t c I y fetas)Q w = o -> []:
rat7 , l U K I eings_map = make_map(ratings[1], 1)
metas_map = make_map(metas[1], 6)
merged = []
for link in links[1]:
mlink = link.copy() + [\'\',\'\']
mlinc _ C I g uk[3] = ratings_m3 o M a Q : W Kap.get(link[0])[2] if ratings_map.get(link[0]) else \'\'
mlink[4] = met5 9 k c H 4as_map.get(link[1])[20] if metas_map.get(linkR ) b Z &[1]) else2 ( 6 e V w h \'\'
merB [ Fged.append(mlink)
return merged0 m m ) % D

我没有实现分组过滤器-相对较快(需要26m行结果的排序-扫描-结合)-我~ K g L 3 l R T估计加载和处理的总时间约为0:53

将原始CSV加载到数组-35秒

手动合并索引— 3秒

手动分组和过滤器-15秒(待定〜估算)

原始Python很快但很丑。 全速本地P# n D 2 d 5 ZC并完全控制所有错误。

Pandas数据框

Pandas [2]是Python上用于数据准备的事实上的软件包。 极其快7 # G , x速且易于使用,我们可以用最少的代码进行加载,连接和分组:

import pandas as pd

# load files
ratings_df = pd.read_csv(\'ratings.csv\')
metas_df = pd.read_csv(\'movies_metadato u l . V s 9 qa.csv\')
links_df = pd.read_csv(\'links.csv\')

# 1st join
merged_df = pd.merge(links_df[[\'movieId\',\'imdbId\']],
rati? p r w s B Zngs_df[[\'movieId\',f 2 # c\'rating\']], on=\'movieId\', how=\'riS W 4 _ h A y ght\8 # b J - N h Y q',
validi x y g jate=\'one_to_many\')

# 2nd join
merged_df = pd.merge(merged_df, metas_df[[\'/ 7 ? p Rtitle\',\'imdb_id\']],
left_on=\'imdbIdI a { e\', rig] P ^ 7 * rht_on=\'imdb_id\', how=\'inner\')

# group-by having
grouped_df = merged_df[[\'title\',\'rating\']].groupby(\'title\').
agg(Mean=(\'rating\',n ! q \'mean\' ), C0 9 w D J y Wount=(z s f @ 1 p\'rating\',\'count\')).
query(\'Mean > 4.5 and Count > 2\')

Pandas很聪明。 您无需预定义哈希或索引,它似乎可( 0 O B &以动态生成优化连接所需的内容。 最大的限制是它存在于单个计算机上。 在大约0:17的时间内处理26m行,使用更少的代码,并且没有外部系统(数据库,集群等)。

将3个csv加载到DataFq p 4rames — 5秒

加入3个DataFrames — 8秒

加入,分组和过滤-+4秒

Pands文件加载比我自定义的py〜35sec和5sec快得多! 要显示不是黑客,请使用库。 从理论上讲,Pandas是它的单线程/进程(在我的TaskManager中看起来不是这样),因此数据集的大小受PC内存的限制。

尽管如此,Pandas是处理中小型数据的最终方法,但我B , 1 c : P K们需要大数据!

Spark Clusters FTWX + r(致胜)

SQL很棒,但是并行E N & O k y化和破解能力有限。 Python和Pandas超= 7 k $ 5 D ~ 4级灵活,但缺乏可伸缩性。 Apache Spark [5]是在大数据上并行化内存中操作的实际方法。

Spark有一个称为DataFrame的对象(是另一个!),它就像Pandas DataFrame一样,甚至可以从中加载/窃取数据(尽管您可能应该通过HDFS或Cloud加载数据,以避免BIG数据传输问题):

from pyspark.sql importP c { o Z ? 8 r SparkSession
from pyspark im= _ ]port SparkContext, SparkConf

# context config/setupL = 0 %
sc = SparkContext(conf=SparkConf().setMaster(\'local[8]\'))
spark = SparkSession.builder.getOrCreate()
spark.conf.set(\"spark.sql.executV ; l 7 ! #ion.arrow.enabled\", \"true\q Z I Y ~")6 k g U L n C 4

# copy pandas df\'s into spark df\'s
t1 = spark.createDataFrame(ratings_df)
t2 = spark.createDataFrame(links_df)
tS g `3 =E D 7 P 8 & sp7 E K Z Sark.createDataFrame(metas_df)

# normal spark join (runs on cluster default partioning)
df = t1.join(t2, t1[\'movieId\']==t2[\'movieId\']).
join(t3, t2[\'imdbId\'] == t3[^ ` E F a g 1\'imbd_id\'])

# broadcast smM L 4 #aller tables to worker
bc_df = t1.join(func.broadcast(t2), t1[\'movieId\']==t2[\'mov: D E X { K ` 7ieId\']).
join(func.broadcast(t3), t2[\'imdbId\'] == t3[\'imbd_id\'])w A v R * H R c #

# group by results
df.groupBy(\'g y P h 4title\').agg(func.mean(\'ratingG N I\').
alias(\'avg_rating\'),func.count(\'rating\').
alias(\'rm 3 V 4 # (_count\')).filter(\'r_count >q @ $2\').
filte6 ~ a . b ir(\'avg_rating > 4.5\' ` q 5 z [).show(F O 3 k 2 : K 9 |)

G D 9 n编写了两个Spark连接方法。 两者并行运行。 默认模式(第15行)将对数据进行分区,并D J & c T ; : = }在群集节点上随机(分散)数据。 后者的\"广播\"模式(第18行)仅复制一次较小的表,并且仅分区并发n = ;送较大的表内容。 使用较小的联接表,广播模式可以更快。

Spark将工作划分为多个工作节点(JVM(设置为8,以匹配我的CPU内核数)),以划分并N . ? F |征服到一个聚合中。 Spark代码和结果输出如下:

d- U M ` { { c /f.groupBy(\'ti- U M Z ltle\').agg(func.mean(\'rating\').   
a$ ; 3 `lias(\'avg_rating\'),func.count(\'rating\').
alias(\'r_count\')).filter(\'r_count >2\').
filter(\'avg_rating > 4.5\').show()

大数据Join指南-Python,SQL,Pandas,Spark,Dask

Output from Spark\'s join and group-by operation on 26m rows

性能摘要

(我的笔记本电脑的非实验室认证结果)

首先请注意将3个数据集连# k Q m接在一起的运行时间:

大数据Join指南-Python,SQL,Pandas,Spark,Dask

Performance for joins

令人惊讶的是,原始的Python解决方案是最快的吗? 哈克m Q ! + - ! H C哈克!

顶级分组的最终结果(包括Spark):

大数据Join指南-Python,SQL,Pandas,Spark,Dask

Performance for joinv s S H 2 B and group-by + filtering

总结:

Pandas具有卓越* y p Y ? P的快速性} G B ;和高效性,因此您拥有核@ 7 e F心的记忆力。 在某些时候,Python / Pandas将耗尽内存并崩溃。

尽管集群管理可能很棘手,但Spark是一个很好的扩展解决方案。 内存中分布式处理,对作业和数据进行分区以及分区存储策略(k ; k ! R lHDFS或其他)是正确的方向。

RDBMS可靠,但在移动数据和处理方面有扩展限制

下一章将进一步介绍Sg K x K 2 V ~ m 7park。 糟糕,我忘3 g S 9记了Dask(原生Pythp 6 E r t ) Con群集),也许是下次。

SQLite3资源配置文件

大数据Join指南-Python,SQL,Pandas,Spark,Dask

Only 2 co} $ , p kresx 9 C { n really active, extra I/O (even though its mo+ C z I # _ @ 2stly cached)` K r

Panda q t P ]s资源简介

大数据Join指南-Python,SQL,Pandas,Spark,Dask

Surprisingly good CPU utilization fo# K u J v ~ ar what I thought was as sn k U l T Y Vingle threaded/task process ?

Spark资源配置文件(8个工作程序,10个分区)

大数据Join指南-Python,SQL,Pandas,Spark,Dask

All cores uti^ l , t +lized w/ 8 workers — good CPU and memory distribution

以上数据来自我的MSFT Surface Laptop 3 — i7- I h d z M d 6 / 16gb / 256gb SSD

参考和启示

[0]测试代码的完整来源(不仅仅是要点)-DougFoo的GitHub

[1] SQLite Python指南-官方Python文档

[a T B L2]Pandas指南-10分钟教程

[3]较 Q p N ,旧的分析SQLite vs Pandas — Wes McKiney博客

[4] Spark加入DB Decu 4 ;k — Da! k AtaBricq R ^ & @ 0 bks演示

[5]关于Spark的详细介绍-A. Ia0 } q ! E C B Vlenti的TDS文章

[6] PYArroC | @ E - z Cw用于在Spark中快速加载DataFrame — Bryan Cutler IBM

[7]在10分钟内安装PySpark Win — TDS文章作者:乌玛格(Uma G)

[8]电影评论文件-Kaggle数据集

(本文翻w w F译自Doug Foo的文章《Guide to Big Dat F E ~ H z O a #a Joins — Python, SQL, Pandas, Spark, Dask》,参考:https://t$ } s S 3 b O y @owardsdatascience.com/guide-to-big-data-joins-python-sql-pandas-spark-dask-51b7f4fec810)

上一篇

美味做法推荐,豆干炒腊肉,酸菜鸡,枸杞炒丝瓜,鲜味带鱼

下一篇

原创 古代没有雷达,行军打仗时,是如何判断敌方有多少兵力的呢

评论已经被关闭。

插入图片
返回顶部