Skip to content
On this page

记一次简单的数据分析


📍 2022-10-08 🏷️ #数据分析 #Python #openpyxl

9 月底,写了一个小项目,尝试使用脚本批量处理 Excel 数据,过程曲折,值得记录。

需求

把某一年的若干张表格汇总成一张表。

这其实是一个简单的需求。当然,前提是所有的数据应是结构化的。那么使用 Excel 来完整汇总工作,可以很容易就上手。

问题在于,不同的表结构差异明显,难以通过直接复制、粘贴完成汇总。

举个例子,有如下 Sheet1, Sheet2, Sheet3 表格(均为随机生成):

Sheet1姓名Data1Data2Data3
1姚娜2593.922772.2115455
2禄子蕙1242.415384.3243576
...............
Sheet2姓名Data5Data6
1汪秋灵593.19272.72
2益彦杉122.24538.43
............
Sheet3姓名Data7Data8Data9Data10
1甄谷山253.92272.2115453525
2松月122.41584.3243726752
..................

其中,Sheet1 需要汇总的列有:

  • 姓名
  • Data3

Sheet2 需要汇总的列有:

  • 姓名
  • Data5

Sheet3 需要汇总的列有:

  • 姓名
  • Data7
  • Data8
  • Data9
  • Data10

肉眼可见:姓名一列没有全局的编号或序号;需要汇总的数据列名也各不相同。

思路

在拿到原始数据之前,最先想到的笨办法就是给每一张表写一个解析逻辑,根据需要输出结构统一的汇总表格。相当于半结构化输入,结构化输出。

进一步思考会发现,解析后直接输出表格,不利于后期二次数据分析或者其他运算。更好的实现方法应该是建立数据库,每解析一行,就在数据库新增一条记录。在此基础上,汇总表格其实只是一个函数的输出结果。

实现

依赖库

  • openpyxl
  • sqlalchemy

Database 表结构

python
# 员工表

class Stuff(Base):
    __tablename__ = 'stuff_account'

    id = Column(Integer, primary_key=True)
    name = Column(String(10), unique=True)
    incomes = relationship('Income', order_by=Income.id, back_populates='stuff')
# 员工表

class Stuff(Base):
    __tablename__ = 'stuff_account'

    id = Column(Integer, primary_key=True)
    name = Column(String(10), unique=True)
    incomes = relationship('Income', order_by=Income.id, back_populates='stuff')
python
# 收入表

class Income(Base):
    __tablename__ = 'income_record'

    id = Column(Integer, primary_key=True)
    i_name = Column(String(64))
    amount = Column(Float, nullable=False)
    date = Column(Date, nullable=True)
    stuff_id = Column(Integer, ForeignKey('stuff_account.id'), nullable=False)
    stuff = relationship('Stuff', back_populates='incomes')
# 收入表

class Income(Base):
    __tablename__ = 'income_record'

    id = Column(Integer, primary_key=True)
    i_name = Column(String(64))
    amount = Column(Float, nullable=False)
    date = Column(Date, nullable=True)
    stuff_id = Column(Integer, ForeignKey('stuff_account.id'), nullable=False)
    stuff = relationship('Stuff', back_populates='incomes')

解析器

python
def parser(rows:list, sheet_name:str, name_idx:int, amount_idx:int, i_date:date) -> list:
    records = list()
    for row in rows:
        stuff_name = row[name_idx]
        amount = row[amount_idx]
        record = {
            'stuff_name': stuff_name,
            'data': Income(
                i_name=sheet_name,
                amount=amount,
                i_date=date
            )
        }
        records.append(record)
    return records
def parser(rows:list, sheet_name:str, name_idx:int, amount_idx:int, i_date:date) -> list:
    records = list()
    for row in rows:
        stuff_name = row[name_idx]
        amount = row[amount_idx]
        record = {
            'stuff_name': stuff_name,
            'data': Income(
                i_name=sheet_name,
                amount=amount,
                i_date=date
            )
        }
        records.append(record)
    return records

输出

python
def export(year:int, sheet_names:list, dest_file:str) -> None:
    wb = Workbook()
    ws1 = wb.active
    ws1.title = '收入汇总' # 汇总工作簿的汇总工作表
    for sheet_name in sheet_names:
        ws = wb.create_sheet(title=sheet_name)
        ws['A1'] = sheet_name
        ws['A2'] = '序号'
        ws['A3'] = '姓名'
        for month in range(1, 13): # 按月分设置表头
            row_idx = 3 # 行纵坐标
            column_idx = month + 2 # 列横坐标
            ws.cell(row=2, column=column_idx, value=f'{str(month)}月')
            # 根据标头填充行
            for stuff in session.query(Stuff).order_by(Stuff.name):
                record = session.query(Income.amount).filter_by(
                stuff_id=stuff.id, date=date(year, month, 1), i_name=sheet_name).first()
                ws.cell(row=row_idx, column=1, value=row_idx-2) # 写序号
                ws.cell(row=row_idx, column=2, value=stuff.name) # 写姓名
                ws.cell(row=row_idx, column=column_idx, value=record[0] if record else 0) # 写金额数据
    wb.save(filename=dest_file)
def export(year:int, sheet_names:list, dest_file:str) -> None:
    wb = Workbook()
    ws1 = wb.active
    ws1.title = '收入汇总' # 汇总工作簿的汇总工作表
    for sheet_name in sheet_names:
        ws = wb.create_sheet(title=sheet_name)
        ws['A1'] = sheet_name
        ws['A2'] = '序号'
        ws['A3'] = '姓名'
        for month in range(1, 13): # 按月分设置表头
            row_idx = 3 # 行纵坐标
            column_idx = month + 2 # 列横坐标
            ws.cell(row=2, column=column_idx, value=f'{str(month)}月')
            # 根据标头填充行
            for stuff in session.query(Stuff).order_by(Stuff.name):
                record = session.query(Income.amount).filter_by(
                stuff_id=stuff.id, date=date(year, month, 1), i_name=sheet_name).first()
                ws.cell(row=row_idx, column=1, value=row_idx-2) # 写序号
                ws.cell(row=row_idx, column=2, value=stuff.name) # 写姓名
                ws.cell(row=row_idx, column=column_idx, value=record[0] if record else 0) # 写金额数据
    wb.save(filename=dest_file)

一些插曲和思考

上述方法能顺利地跑起来,是基于绝对理想的原始数据。在实际运行过程中,经历了反复的调试,归根结底就是数据清洗

几个例子:

  1. 一张表格中,出现同名行。同名问题要分两种情况处理,一是客观同名,那么就要做 name 字段的区分,最常见的就是同名名字后面加上(女)或这(小);二是错误同名,需要进一步核实准确数据,成本极高,我的解决方案是时间戳重命名:

    python
    from time import time
    stuff_name = stuff_name.replace(' ', '') # 删掉姓名中的空格
    if stuff_name in map(lambda r: r['stuff_name'], records):
        stuff_name += str(time())[-7:-1]
    
    from time import time
    stuff_name = stuff_name.replace(' ', '') # 删掉姓名中的空格
    if stuff_name in map(lambda r: r['stuff_name'], records):
        stuff_name += str(time())[-7:-1]
    

    这种方法相当于搁置错误同名,基本上可以避免多次重名的情况,但后续还需人工修正。

  2. 若干表格中的同一个人,部分名字为错别字。因为脚本会将错别字名字作为另一个人来处理,所以运行不会报错,也不会影响部分统计结果。这种情况一般仅能通过肉眼检查发现,成本较高。

  3. 数据库更新。Stuff 表的增删;Income.amount 的运算等等。

完成这个项目,其实大部分的时间都用于数据清洗。令人纠结的是,对于一个体量不大的数据集,做一次性的数据分析,耗费大量时间精力去做数据清洗,甚至与人肉分析相当,那去写一个脚本的意义何在呢?

不过有人说“懒惰是人类文明进步的真正动力”,与其无尽地复制粘贴、统计求和,不如让计算机去完成它擅长的工作。

Powered by VitePress.