掘金 人工智能 05月07日 18:13
【SQL 周周练】爬取短视频发现数据缺失,如何用 SQL 填充
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍了如何使用SQL对短视频点赞量数据进行插值补全,以解决爬虫数据缺失问题。文章提供了前向填充、后向填充、相邻平均数填充和相邻分位数填充四种简单补全方法,并详细阐述了每种方法的SQL实现思路和代码。此外,文章还分享了使用Gompertz函数模拟短视频点赞量变化,以及使用PyHive包操作Hive的实践经验,旨在帮助读者掌握SQL数据补全技巧,提升数据分析能力。

🚀 **前向填充**:使用`last_value`窗口函数,以前面最近的一个非空值来填充缺失的点赞量。若找不到非空值,则用0兜底,确保数据连续性。

⏪ **后向填充**:运用`first_value`窗口函数,以后面最近的非空值来填充缺失值。如果向后查找失败,则向前查找非空值进行兜底,保证数据补全的完整性。

➗ **相邻平均数填充**:结合前向和后向填充,取前后最近非空值的平均值填充缺失值。在计算平均值时,需要谨慎处理找不到非空值的情况,以确保结果的准确性。

📊 **相邻分位数填充**:相比平均数填充,分位数填充更能保持线性增长关系。通过计算缺失值所在的分位数位置,进行线性插值,使补全后的数据更符合实际情况。

大家好,我是“蒋点数分”,多年以来一直从事数据分析工作。从今天开始,与大家持续分享关于数据分析的学习内容。

本文是第 5 篇,也是【SQL 周周练】系列的第 4 篇。该系列是挑选或自创具有一些难度的 SQL 题目,一周至少更新一篇。后续创作的内容,初步规划的方向包括:

后续内容规划

1.利用 Streamlit 实现 Hive 元数据展示SQL 编辑器、 结合Docker 沙箱实现数据分析 Agent

2.时间序列异常识别、异动归因算法

3.留存率拟合、预测、建模

4.学习 AB 实验、复杂实验设计等

5.自动化机器学习、自动化特征工程

6.因果推断学习

7.……

欢迎关注,一起学习。

第 4 期题目

题目来源:自创题目,曾经在工作中遇到过该问题

一、题目介绍

公司市场部找到一些达人在抖音、快手等平台进行短视频营销,需要监测视频的点赞量。公司内有一位专职的爬虫工程师,他的项目也很多。因此很难对该项目爬虫数据提供高质量的维护,会出现一些字段缺失的情况。

我们将问题简化,有一张表记录了爬虫抓取的短视频点赞量数据,其中部分日期的点赞量是缺失的。请你利用 SQL 将这些数据补齐,即“插值”。

列名数据类型注释
video_idstring短视频id
dtstring日期
likes_numint点赞量(用来对比结果,不要直接用)
show_likes_numint展示点赞量(用来补全数据)

用 SQL 实现几种比较简单的插值方法,复杂的方法可以利用 Hive 中的 transform 函数调用 Python 脚本来实现(后面哪期会根据这个点水一篇文章)

本文实现的简单补全方法有:

1.前向填充,使用前面最近的一个非空值来填充2.后向填充,使用后面最近的一个非空值来填充3.相邻的平均数填充,使用前后最近的非空值,取两个数的平均数填充4.相邻的分位数填充,使用前后最近的非空值,缺失值根据分位数来填充

额外说明:这四种方法都依赖于缺失值邻近的前后非空值,需要存在这样的非空值。如果该非空值不存在,比如短视频第一天发布就没有爬取到点赞量 —— 这样没有办法,找到它之前的非空点赞量。我本文的处理方法是将它“视为”前一天发布,或者说增加一个前一条点赞量为零的数据(还有其他的处理方法,我这里只提出一种)。

这条增加的数据不需要显式存在,只不过是在数据处理时兜底的逻辑等效于它。而如果短视频缺少的是最后几天的数据,比如某一天开始后面一直缺失数据,这样就将最后一个有数据的点赞量“顺延”下去。这 4 种填充方法,都用这样的逻辑兜底。

二、题目思路

想要答题的同学,可以先思考答案🤔。

.……

.……

.……

我来谈谈我的思路:1.前向填充,使用前面最近的非空值来填充。使用 last_value 窗口函数来实现,注意 last_value 支持两个参数,其中第二个参数设置为 true 则在寻找的时候跳过 null;注意 rows 的范围,另外如果前面实在找不到非 null 值,用 0 来兜底。

2.后向填充,使用后面最近的非空值来填充。使用 first_value 窗口函数来实现,同样 first_value 也是支持两个参数,其中第二个参数设置为 true 则在寻找的时候跳过 null;这个 rows 的范围更要注意。如果后面实在找不到非 null 值,用前一个非 null 值兜底。所以这里要同时往前往后查找。

3.相邻的平均数填充,融合了前两种方法,前向和后向数据都要寻找,找到后求平均值,这里要更加小心的处理找不到的情况。

4.相邻的分位数填充,是上一种方法的改进。比如 2 个有效的点赞量中间缺少了 3 天的数据,如果这 3 天的数据都用这 2 个有效值的平均值来填充,则相当于这几天的点赞数没有变化,这逻辑不太现实。

采用分位数的方法保持线性增长的关系去填充,比上一种方法更好。注意如果真的是这 2 个有效点赞量数据一致,也就是假设这几天点赞量数据停止变化。平均数和分位数填充,计算的结果是能“兼容”这种情况。

下面,我用 NumPyScipy 生成模拟的数据集:

三、生成模拟数据

只关心 SQL 代码的同学,可以跳转到第四节(我在工作中使用 Hive 较多,因此采用 Hive 的语法)

模拟代码如下:

    定义模拟逻辑需要的常量,定义随机数发生器:
import numpy as npfrom scipy import statsimport pandas as pdimport datetimeSEED = 2025rng = np.random.default_rng(SEED)# 开始日期START_DATETIME = datetime.datetime(2025, 5, 1)# 短视频数量VIDEO_NUM = 100# 一般最高点赞量GENERAL_HIGHEST_LIKES_NUM = 1_000_000# 一般最低点赞量GENERAL_LOWEST_LIKES_NUM = 1
    使用 Gompertz 函数模拟短视频点赞量每日变化。大家一般都知道用 S 型曲线模拟这类增长但有上限的数据,最常见的就是 Logistic 函数。我这里用 Gompertz 函数纯粹是以前没用过,尝尝鲜。工作中肯定是用这两个函数的拟合效果来对比。网上能搜到大量文章从数学角度对两者进行对比。我这里偷懒就不研究了,大家可以自行搜索:
# 参考正态分布 3-sigma,转换到对数正态分布的范围sigma = np.log(GENERAL_HIGHEST_LIKES_NUM / GENERAL_LOWEST_LIKES_NUM) / 6# 如果在 JupyterLab 中分为不同单元格执行,# 必须重置随机数生成器,否则不能复现同样结果# rng = np.random.default_rng(SEED)K_arr = stats.lognorm.rvs(    s=sigma,    loc=0,    scale=np.sqrt(GENERAL_HIGHEST_LIKES_NUM * GENERAL_LOWEST_LIKES_NUM),    size=VIDEO_NUM,    random_state=rng,)K_arr = np.round(K_arr, 0)# 参数 a 随机生成a_arr = rng.uniform(low=0.85, high=0.95, size=VIDEO_NUM)# 参数 b 随机生成b_arr = rng.uniform(low=0.75, high=0.85, size=VIDEO_NUM)# 参数 t0 随机生成t0_arr = rng.choice([7,8,9,10,11,12], size=VIDEO_NUM, replace=True, p=[0.05,0.1,0.25,0.35,0.2,0.05])# 定义 Gompertz 函数,为什么不选常见的 Logistic 函数# 纯粹为了多尝试尝试以前没玩过的def gompertz_func(t, K=100000, a=0.9, b=0.8, t0=10):    return K * np.power(a, np.power(b, t-t0))

Gompertz 函数其中的 KK 代表极限值,在我模拟的场景就是各个短视频点赞的上限。因此我采用对数正态分布模拟这个 KK,我在第 1 节的代码中定义了一般最高/低的点赞量,其实就是借鉴正态分布的 3-sigma 方法。只不过 ln(x)\ln(x) 服从正态分布,取 [μ3σ,μ+3σ][\mu - 3\sigma, \mu + 3\sigma] ;那么 xx 就应该取 [eμ3σ,eμ+3σ][e^{\mu-3\sigma}, e^{\mu+3\sigma}],我们将其分别记为 aabb,则 σ=lnblna6\sigma = \frac{\ln{b} - \ln{a}}{6},而 μ=lna+lnb2\mu = \frac{\ln{a} + \ln{b}}{2}

scipy.stats.lognorm 中,s 是形状参数,对应正态分布的标准差 σ\sigmascale 是尺度参数,对应正态分布的指数均值 eμe^{\mu}。根据上面的推导结果。s=lnblna6\text{s} = \frac{\ln{b}-\ln{a}}{6}scale=ab\text{scale} = \sqrt{ab}。这就是我代码中对数正态分布参数制定的逻辑。

根据 Gompertz 函数的定义,参数 aabb 都是在 (0,1)(0, 1) 之间,我自己手动测试绘图,觉得 aa 在 0.9 附近,bb 在 0.8 附近,函数形状比较靠谱。实际工作中,是应该用数据来拟合去推算参数的取值

    定义随机缺失数据的标识,注意点赞量是整数,四舍五入后转为整数。将前面生成的数据转为 pd.DataFrame,并输出为 csv 文件:
days = np.arange(1, 31)# 定义随机缺失;1 - 表示缺失数据random_missing_flag = rng.choice([0, 1], size=VIDEO_NUM * len(days), p=[0.7, 0.3])video_data = { 'video_id': [], 'dt': [], 'likes_num': [], 'random_miss': random_missing_flag}for i, (k, a, b, t0) in enumerate(zip(K_arr, a_arr, b_arr, t0_arr)):    # 视频 id 数字部分不低于 3 位,补零    video_data['video_id'].extend([f'video_{i:03}']*len(days))    video_data['dt'].extend(days)    video_data['likes_num'].extend(gompertz_func(days, k, a, b, t0))    df = pd.DataFrame(video_data)# 四舍五入并转为整型df['likes_num'] = np.round(df['likes_num'], 0).astype(int)df['dt'] = df['dt'].apply(lambda d: START_DATETIME + datetime.timedelta(days=(d-1)))df['show_likes_num'] = df.apply(lambda r: np.nan if r['random_miss'] == 1 else r['likes_num'], axis = 1)df.drop(['random_miss'], axis=1, inplace=True)out_csv_path = "dwd_short_videos_likes_num_missing_data_from_crawler.csv"df.to_csv(out_csv_path, header=False, index=False, encoding='utf-8-sig')
    如果表存在则删除,创建新的 Hive 表,并将数据 load 到表中:
from pyhive import hive# 配置连接参数host_ip = "127.0.0.1"port = 10000username = "蒋点数分"with hive.Connection(host=host_ip, port=port) as conn:    cursor = conn.cursor()        hive_table_name = "data_exercise.dwd_short_videos_likes_num_missing_data_from_crawler"    drop_table_sql = f"""    drop table if exists {hive_table_name}    """    print(drop_table_sql)    cursor.execute(drop_table_sql)    create_table_sql = f"""    create table if not exists {hive_table_name} (        video_id string comment "短视频id",        dt string comment "日期",        likes_num int comment "点赞量(用来对比结果)",        show_likes_num int comment "展示点赞量(用来补全数据)"    )     comment "短视频点赞量缺失数据,用来练习 SQL 补全数据 | author: 蒋点数分 | 文章编号:0a94d809"    row format delimited fields terminated by ","    stored as textfile    """    print(create_table_sql)    cursor.execute(create_table_sql)    import os    load_data_sql = f"""    load data local inpath "{os.path.abspath(out_csv_path)}"    overwrite into table {hive_table_name}    """    print(load_data_sql)    cursor.execute(load_data_sql)    cursor.close()

我通过使用 PyHive 包实现 Python 操作 Hive。我个人电脑部署了 HadoopHive,但是没有开启认证,企业里一般常用 Kerberos 来进行大数据集群的认证。

四、SQL 解答

1.前向填充的 sql 语句,如果使用 last_valuerows 的范围是 between unbounded preceding and 1 preceding。如果省略这部分,只保留 order by dt asc,则默认为 between unbounded preceding and current row 从最终效果来说是一致的,但是前者写法表述更准确

with calc_exist_likes_num as (    select       video_id, dt, likes_num, show_likes_num    -- 找前面的非空点赞量    -- 注意第二个参数为 true 表示跳过 null    , last_value(show_likes_num, true) over(partition by video_id order by dt asc             rows between unbounded preceding and 1 preceding) as last_exist_likes_num    from data_exercise.dwd_short_videos_likes_num_missing_data_from_crawler)select  video_id, dt, likes_num, show_likes_num-- nvl 只支持 2 个参数;使用 coalesce -- 按照顺序,返回第一个非 Null 的值-- 根据我写的兜底逻辑,如果向前寻找非空数值没有,则用 0 兜底, coalesce(show_likes_num, last_exist_likes_num, 0) as forward_fillfrom calc_exist_likes_num

部分结果验证:

2.后向填充的 sql 语句,如果使用 first_valuerows 的范围是 between 1 following and unbounded following。注意 first_valuelast_value 都是跟 order by dt 的顺序有关,完全可以使用 desc 降序来切换另一个函数。

with calc_exist_likes_num as (    select       video_id, dt, likes_num, show_likes_num    -- 前后邻近的第一个非空点赞量    -- 注意第二个参数为 true,表示跳过 null    , last_value(show_likes_num, true) over(partition by video_id order by dt asc            rows between unbounded preceding and 1 preceding) as last_exist_likes_num        , first_value(show_likes_num, true) over(partition by video_id order by dt asc        rows between 1 following and unbounded following) as next_exist_likes_num    from data_exercise.dwd_short_videos_likes_num_missing_data_from_crawler)select  video_id, dt, likes_num, show_likes_num-- nvl 只支持 2 个参数;使用 coalesce -- 按照顺序,返回第一个非 Null 的值-- 根据我写的兜底逻辑,如果向后寻找非空数值没有,则用前面的第一个非空兜底,最后用 0 兜底, coalesce(show_likes_num, next_exist_likes_num, last_exist_likes_num, 0) as backward_fillfrom calc_exist_likes_num

部分结果验证:

3.相邻平均数填充的 sql 语句,寻找前后相邻的非空值逻辑,这里不再赘述。对前后相邻的非空值求平均,注意这里的兜底逻辑。首先上一个非空点赞量如果不存在,那就填充零,因此求平均的分母这部分的 “1” 必然存在;如果下一个非空点赞量不存在,可以将其当成零,那么分母求平均时,它就不起作用,它的分母部分是 “0”;最后结果注意四舍五入(如果写成显式的判断逻辑也可以,需要引入 ifcase when 语句)。

注意结果要取整,我这里不取整,是为了跟大家展示结果时去反向验证:

with calc_exist_likes_num as (    select       video_id, dt, likes_num, show_likes_num    -- 前后邻近的第一个非空点赞量    -- 注意第二个参数为 true,表示跳过 null    , last_value(show_likes_num, true) over(partition by video_id order by dt asc            rows between unbounded preceding and 1 preceding) as last_exist_likes_num        , first_value(show_likes_num, true) over(partition by video_id order by dt asc        rows between 1 following and unbounded following) as next_exist_likes_num    from data_exercise.dwd_short_videos_likes_num_missing_data_from_crawler)select  video_id, dt, likes_num, show_likes_num-- 如果一开始的数据没有,就按照零计算;因此“上一个点赞量必然存在”-- 如果下一个点赞量不存在,那么就按照零计算,实际就等于不算-- 注意四舍五入取整,我这里不取整,是为了给大家更方便的验证结果, nvl(show_likes_num,  (nvl(last_exist_likes_num, 0) + nvl(next_exist_likes_num,0))     / (1 + if(next_exist_likes_num is not null, 1, 0)) ) as half_fill_likes_numfrom calc_exist_likes_num

部分结果验证:

4.相邻分位数填充的 sql 语句,基本逻辑跟平均数一样;但是不是简单求平均,而是需要计算每个缺失值所在的分位数位置,来“线性插值”。这里稍微推导一下再写 sql:我将缺失值的上一个邻近非空值记为 s,下一个邻近非空值记为 e;因为是分位数,还要考虑位置,将上一个邻近非空值的序号记为 m,下一个邻近非空值记为 n,这个缺失值的位置记录为 i。则根据推导它的位置分位数应该是 (i-m)/(m-n),我们再推导它的值应该是 s + (e-s)*(i-m)/(m-n) 化简后为 s*(n-i)+e*(i-m)。在 sql 中,我利用日期充当序号,序号之间的减法结果,我用 datediff 函数来处理,代码如下:

with calc_exist_likes_num as (    select      video_id, dt, likes_num, show_likes_num    -- 注意将 first_value 和 last_value 的第二个参数设置为 true 表示跳过 null    , last_value(show_likes_num, true) over(partition by video_id             order by dt rows between unbounded preceding and 1 preceding) as last_exist_likes_num    , first_value(show_likes_num, true) over(partition by video_id             order by dt rows between 1 following and unbounded following) as next_exist_likes_num    -- 取出前后存在的点赞量对应的日期端点    -- 注意要根据点赞量同时将日期设为 null    , last_value(if(show_likes_num is null, null, dt), true) over(partition by video_id             order by dt rows between unbounded preceding and 1 preceding) as last_exist_likes_dt    , first_value(if(show_likes_num is null, null, dt), true) over(partition by video_id             order by dt rows between 1 following and unbounded following) as next_exist_likes_dt    , min(dt) over(partition by video_id) as base_dt    from data_exercise.dwd_short_videos_likes_num_missing_data_from_crawler)select  video_id, dt, likes_num, show_likes_num-- 推导见说明,此处不四舍五入也是为了验证结果, nvl(show_likes_num, if( e is null, s, (s * n_i + e * i_m) / (n_i + i_m))) as percentile_fill_likes_numfrom (    select      video_id, dt, likes_num, show_likes_num    , nvl(last_exist_likes_num,0) as s    , next_exist_likes_num as e    , if(last_exist_likes_num is not null, datediff(dt, last_exist_likes_dt), datediff(dt, base_dt)+1) as i_m    , datediff(next_exist_likes_dt, dt) as n_i    from calc_exist_likes_num) t-- 经过一通处理,发现原来的顺序被破坏,重新排序order by video_id asc, dt asc

注意里面的兜底逻辑,比如取一个 min(dt) 作为如果找到前面的非空值,则将其设置为更早日期的前一天,求 i_m 即 i-m 时 datediff(dt, base_dt)+1+1 就是这么来的。如果 e 不存在,也就是下一个非空值找不到,直接用上一个非空值“顺延”下去。

部分结果验证:


😁😁😁我现在正在求职数据类工作(主要是数据分析或数据科学);如果您有合适的机会,恳请您与我联系,即时到岗,不限城市。

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

SQL 数据插值 数据补全 窗口函数 Hive
相关文章