金蝶云ERP +Tableau 实践:以 API构建企业级数据仓库(上)

金蝶ERP + Tableau 实战:以Python+PostgreSQL构建数据仓库。涵盖API调用、元数据解析、采购订单表处理等全流程,真实零售客户项目经验总结。

最近老客户推荐了新客户,让我基于金蝶 ERP 构建企业的分析中心,相比Sap、Monitor ERP 等成熟 ERP,金蝶 ERP 最大的问题在于如何及时、准确地获得数据。

坐拥全国这么多的云端客户,金蝶甚至都不给客户开放数据分析所需要的数据库权限!API 数据每日限制只有5万行 ,这在保证系统稳定性的同时,也限制了客户的手脚,限制了系统的持续改进。

为此,在购买 API2DataBase 的系统之前,我不得不使用 Python 先自己尝试获得一个数据表,开始探索之路。

一、构建平台的关系

金蝶 ERPTableau 之间,很明显需要一个“数据仓库”的实体存在。测试阶段,这个数据仓库可以是我本地的 PostgreSQL 或者 MySQL 数据库,正式交付客户时,则将是基于云服务的 Postgresql 云数据库。

图示展示了金蝶ERP与API、PostgreSQL数据库以及Tableau BI之间的关系,涉及数据流和处理步骤。

之所以使用 Postgresql 而非 MySQL,还有一个原因是是,前者可以更好地兼容 json 数据格式,特别是9.4引入了 jsonb 类型,它存储二进制格式,具有更好的查询性能。

由于金蝶API 可能随着时间变化,因为我计划将 元数据JSON 数据(多层嵌套结构)先存储到数据库,根据元数据信息,查询相关数据表信息并写入数据库,通过存储过程(如 PL/pgSQL)或视图来构建一层抽象,用于数据处理(如解析嵌套字段)和分析(如过滤 MustInput=1 的字段、统计 FieldType 等)。

这是一个典型的 ETL/分析流程,数据库选择需要平衡 JSON 存储的灵活性、查询效率、一致性和维护成本。

注:2026年二季度开始,将接受“金蝶云星空/金蝶云苍穹”的新客户项目,并以此为基础筹划“供应链分析主题最佳实践”的图书,有兴趣的客户或个人欢迎提前联系。 @admin (wyp@vizwise.cn)

一张展示金蝶 ERP、PostgreSQL 数据库及 Tableau 服务器的架构示意图,包含 API 同步与数据分析组件,图中标注了相关服务器和网络配置。

二、数据采集过程(简化版本)

这里以我正在做的“采购订单表”为例。

1、API 的使用说明

金蝶的采购订单为例,对应的 formid = PUR_PurchaseOrder。如下图所示,展示了金蝶 API 官方测试的示例,可以查询采购订单前2000行数据。

显示金蝶云系统的 API 测试界面,包含请求参数和结果展示,界面上突出显示了主要 ID 和请求字段的输入限制。

查询结果如下所示:

[
  [
    "2022-03-24T00:00:00",
    "CGDD000001"
  ],
  [
    "2022-03-24T00:00:00",
    "CGDD000002"
  ],
  [
    "2022-03-24T00:00:00",
    "CGDD000003"
  ],
……  
]
{
  "FormId": "PUR_PurchaseOrder",
  "FieldKeys": "FBillNo,FBillTypeID,FDate,FSupplierId,FPurchaseOrgId,FPurchaseDeptId,FPurchaserGroupId,FPurchaserId,FProviderId,FSettleId,FChargeId,FChangeStatus,FACCTYPE,FSettleModeId,FPayConditionId,FSettleCurrId,FExchangeTypeId,FExchangeRate,FPriceTimePoint,FFOCUSSETTLEORGID,FProductType,FIsIncludedTax,FISPRICEEXCLUDETAX,FLocalCurrId,FMaterialId,FMaterialDesc,FUnitId,FQty,FPriceUnitId,FPriceUnitQty,FPriceBaseQty,FDeliveryDate,FPrice,FTaxPrice,FEntryTaxRate,FRequireOrgId,FRequireDeptId,FReceiveOrgId,FEntrySettleOrgId,FStockUnitID,FStockQty,FStockBaseQty,FSupplierLot,FDeliveryMaxQty,FDeliveryMinQty,FDeliveryEarlyDate,FDeliveryLastDate,FPriceCoefficient,FEntrySettleModeId,FReqTraceNo,FPlanConfirm,FSalUnitID,FSalQty,FCentSettleOrgId,FDispSettleOrgId,FDeliveryStockStatus,FSalBaseQty,FEntryPayOrgId,FAllAmountExceptDisCount,FPlanQty,FPREARRIVALDATE,FSUPPLIERDELIVERYDATE,FYFRATIO,FYFAMOUNT",

  "FilterString": "FDate = '2022-03-24T00:00:00' ", 
  "OrderString": "",
  "TopRowCount": 0,
  "StartRow": 0,
  "Limit": 10,
  "SubSystemId": ""
  
}

2、API 获得订单元数据返回 json 的解析

如果只是查询几个字段,大可不必元数据,只需要指定字段名称,然后查询、保存即可。如下所示:

para = {
        "FormId": "PUR_PurchaseOrder",
        "FieldKeys": "FBillNo,FBillTypeID,FDate,FSupplierId,FPurchaseOrgId,FAllAmountExceptDisCount,FPlanQty,FPREARRIVALDATE,FSUPPLIERDELIVERYDATE,FYFRATIO,FYFAMOUNT",
        "FilterString": "FDate = '2024-03-24T00:00:00'",  # [],  #
        "OrderString": "",
        "TopRowCount": 0,
        "StartRow": 0,
        "Limit": 200,
        "SubSystemId": ""
    }
    response = api_sdk.ExecuteBillQuery(para)
    print("test:",response)
res=  [
['CGDD001666', '6d01d059713d42a28bb976c90a121142', '2025-07-02T00:00:00', 136496, 1, 102900, 0, 106815, 136496, 136496, 136496, 'A', 'Q', 0, 118221, 1, 1, 1.0, '1', 0, '1', True, True, 1, 137700, 'YY【】XXXXX/袋001', 103818, 1160.0, 103818, 1160.0, 1160.0, '2025-07-10T00:00:00', 3.663717, 4.14, 13.0, 1, 0, 1, 1, 103818, 1160.0, 1160.0, ' ', 1160.0, 1160.0, '2025-07-10T00:00:00', '2025-07-10T23:59:59', 1.0, 0, ' ', True, 103818, 1160.0, 0, 0, 10001, 1160.0, 0, 4802.4, 1160.0, '2025-07-10T00:00:00', '2025-07-10T00:00:00', 50.0, 2401.2], 
['CGDD001666', '6d01d059713d42a28bb976c90a121142', '2025-07-02T00:00:00', 136496, 1, 102900, 0, 106815, 136496, 136496, 136496, 'A', 'Q', 0, 118221, 1, 1, 1.0, '1', 0, '1', True, True, 1, 137700, 'YY【】XXXXX/袋001', 103818, 1160.0, 103818, 1160.0, 1160.0, '2025-07-10T00:00:00', 3.663717, 4.14, 13.0, 1, 0, 1, 1, 103818, 1160.0, 1160.0, ' ', 1160.0, 1160.0, '2025-07-10T00:00:00', '2025-07-10T23:59:59', 1.0, 0, ' ', True, 103818, 1160.0, 0, 0, 10001, 1160.0, 0, 4802.4, 1160.0, '2025-07-10T00:00:00', '2025-07-10T00:00:00', 50.0, 2401.2]
]
api_sdk = K3CloudApiSdk("https://XXXX.ik3cloud.com/k3cloud/")
    api_sdk.Init(config_path='./conf.ini', config_node='config')

    # 注意:这里的FieldKeys是硬编码的,如果元数据变化需要同步修改
    # FBillNo 在索引 0, FMaterialDesc 在索引 25
    field_keys_str = "FBillNo,FBillTypeID,FDate,FSupplierId,FPurchaseOrgId,FPurchaseDeptId,FPurchaserGroupId,FPurchaserId,FProviderId,FSettleId,FChargeId,FChangeStatus,FACCTYPE,FSettleModeId,FPayConditionId,FSettleCurrId,FExchangeTypeId,FExchangeRate,FPriceTimePoint,FFOCUSSETTLEORGID,FProductType,FIsIncludedTax,FISPRICEEXCLUDETAX,FLocalCurrId,FMaterialId,FMaterialDesc,FUnitId,FQty,FPriceUnitId,FPriceUnitQty,FPriceBaseQty,FDeliveryDate,FPrice,FTaxPrice,FEntryTaxRate,FRequireOrgId,FRequireDeptId,FReceiveOrgId,FEntrySettleOrgId,FStockUnitID,FStockQty,FStockBaseQty,FSupplierLot,FDeliveryMaxQty,FDeliveryMinQty,FDeliveryEarlyDate,FDeliveryLastDate,FPriceCoefficient,FEntrySettleModeId,FReqTraceNo,FPlanConfirm,FSalUnitID,FSalQty,FCentSettleOrgId,FDispSettleOrgId,FDeliveryStockStatus,FSalBaseQty,FEntryPayOrgId,FAllAmountExceptDisCount,FPlanQty,FPREARRIVALDATE,FSUPPLIERDELIVERYDATE,FYFRATIO,FYFAMOUNT"

    para = {
        "FormId": form_id,
        "FieldKeys": field_keys_str,
        "FilterString": "FDate = '2024-03-24T00:00:00'",  
        "OrderString": "", 
        "TopRowCount": 0, 
        "StartRow": 0, 
        "Limit": 2000, 
        "SubSystemId": ""
    }

    print(f"   {start_date_str} 查询完成,共 {len(all_rows)} 条记录,准备写入数据库...")
    inserted_count = insert_data(cur, conn, all_rows, field_list, table_name, schema)
    # print(f"成功将 {inserted_count} 条数据写入表 {table_name}")

3、API 的改进:函数和循环

关键在于,如何使用函数简化操作,同时使用循环提高效率。

比如 ,我的一个主程序文件是这样的

#!/usr/bin/python
# -*- coding:GBK -*-
import kingdee_utils
from datetime import datetime, timedelta

def main():
    schema = 'kingdee'
    form_id = "PUR_PurchaseOrder"
    table_name = form_id + "0817"  # 建议加上日期或版本号以区分

    # 1. 查询元数据
    # 注意:元数据字段顺序必须和 fetch_and_insert_daily_data 中 FieldKeys 的顺序严格一致
    # 这里我们假设是一致的,如果API返回字段顺序不确定,需要做额外匹配处理
    field_list = kingdee_utils.query_metadata(form_id)
    if not field_list:
        print("未获取到元数据,程序退出。")
        return

    # 2. 数据库连接和建表
    conn = kingdee_utils.get_db_connection()
    cur = conn.cursor()
    kingdee_utils.create_table_if_not_exists(cur, table_name, field_list, schema)
    conn.commit()

    # 3. 按日期循环处理数据
    start_date = datetime(2025, 7, 11)
    total_days = 5
    total_inserted = 0

    for day in range(total_days):
        current_date = start_date + timedelta(days=day)
        date_str = current_date.strftime('%Y-%m-%dT00:00:00')
        print(f"\n======== 开始处理日期: {current_date.strftime('%Y-%m-%d')} ========")

        # 调用新的核心函数,完成“获取并写入”的全部操作
        inserted_count = kingdee_utils.fetch_and_insert_daily_data(
            cur, conn, form_id, table_name, field_list, schema, date_str
        )
        total_inserted += inserted_count
        print(f"======== 日期 {current_date.strftime('%Y-%m-%d')} 处理完毕, 当日插入 {inserted_count} 条数据 ========")

    # 4. 关闭连接
    cur.close()
    conn.close()
    print(f"\n所有任务完成,总计插入 {total_inserted} 条采购订单数据。")


if __name__ == '__main__':
    main()
#!/usr/bin/python
# -*- coding:GBK -*-
# 日期2025/08/16 更新 by Gemini
import json
import psycopg2
from psycopg2.extras import execute_values
from k3cloud_webapi_sdk.main import K3CloudApiSdk
from datetime import datetime


def get_db_connection(schema='kingdee'):
    """获取数据库连接并设置 schema"""
    try:
        conn = psycopg2.connect(
            dbname='postgres',
            user='postgres',
            password='XXXXXX',
            host='localhost',
            port=5432
        )
        cur = conn.cursor()
        cur.execute(f"SET search_path TO {schema};")
        conn.commit()
        print(f"成功连接到数据库,设置 schema 为 {schema}")
        return conn
    except psycopg2.Error as e:
        print(f"数据库连接失败: {e}")
        raise


def check_table_existence(cur, table_name, schema='kingdee'):
    """检查表是否存在"""
    try:
        cur.execute("""
            SELECT EXISTS (
                SELECT 1
                FROM information_schema.tables
                WHERE table_schema = %s AND table_name = %s
            )
        """, (schema, table_name))
        exists = cur.fetchone()[0]
        print(f"检查表 '{schema}.{table_name}' 是否存在: {'是' if exists else '否'}")
        return exists
    except psycopg2.Error as e:
        print(f"检查表是否存在时出错: {e}")
        return False


def create_table_if_not_exists(cur, table_name, field_list, schema='kingdee'):
    """如果表不存在,动态创建表"""
    if check_table_existence(cur, table_name, schema):
        print(f"表 {schema}.{table_name} 已存在,跳过创建")
        return
    print(f"表 {schema}.{table_name} 不存在,正在创建...")
    columns = []
    for field in field_list:
        col_name = field.get('Key', '')
        data_type = field.get('FieldType', 'String')
        pg_type = 'VARCHAR(255)'  # 默认类型
        if data_type in ['integer', 'number', 'decimal', 'float']:
            pg_type = 'NUMERIC'
        elif data_type == 'datetime':
            pg_type = 'TIMESTAMP'
        elif data_type == 'date':
            pg_type = 'DATE'
        elif data_type in ['boolean', 'bool']:
            pg_type = 'BOOLEAN'
        col_def = f'"{col_name}" {pg_type}'  # 使用双引号以支持大小写混合的列名
        columns.append(col_def)
    columns.append('"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP')
    create_table_query = f'CREATE TABLE {schema}."{table_name}" ({", ".join(columns)})'
    try:
        cur.execute(create_table_query)
        print(f"创建表 {schema}.{table_name} 成功")
    except psycopg2.Error as e:
        print(f"创建表失败: {e}")
        raise

def _map_kingdee_field_type(field_type_code):
    """
    将 Kingdee 的整数 FieldType 代码映射为描述性字符串。
    此映射可能需要根据您的 Kingdee 系统具体情况进行扩展。
    """
    # 基于您提供的示例和通用类型进行映射
    mapping = {
        # 文本类型
        231: 'String',  # 例如: FBillNo
        175: 'String',  # 例如: FDocumentStatus (枚举类型)
        # 日期/时间类型
        61: 'Date',     # 例如: FDate
        # 基础资料/引用类型 (作为字符串处理)
        127: 'String',  # 例如: FPurchaseOrgId, FSupplierId
        # 数值类型 (需要根据实际情况补充)
        107: 'Decimal', # 假设 107 是数值类型
        # 其他类型...
    }
    # 如果代码未在映射中找到,则默认为字符串类型
    return mapping.get(field_type_code, 'String')


def query_metadata(form_id):
    """查询元数据并返回字段列表"""
    api_sdk = K3CloudApiSdk("https://XXXXX.ik3cloud.com/k3cloud/")
    api_sdk.Init(config_path='./conf.ini', config_node='config')
    para = {"FormId": form_id}
    response = api_sdk.QueryBusinessInfo(para)
    print("1 - 正在解析元数据...")
    try:
        res = json.loads(response)
        if not res.get('Result', {}).get('ResponseStatus', {}).get('IsSuccess', False):
            print("  元数据查询失败:", res.get('Result', {}).get('ResponseStatus', {}).get('Errors', []))
            return []

        need_return_data = res.get('Result', {}).get('NeedReturnData', {})

        # 遍历所有 Entrys (如单据头、单据体),收集所有字段
        all_fields = []
        for entry in need_return_data.get('Entrys', []):
            all_fields.extend(entry.get('Fields', []))

        if not all_fields:
            print("  元数据中未找到任何字段。")
            return []

        # 构建最终的字段列表
        field_list = []
        for field in all_fields:
            if field.get('Key'):
                field_list.append({
                    'Key': field.get('Key'),
                    'Name': next((item['Value'] for item in field.get('Name', []) if item['Key'] == 2052),
                                 field.get('Key')),
                    'FieldType': _map_kingdee_field_type(field.get('FieldType'))
                })

        print(f"  元数据解析成功,共找到 {len(field_list)} 个字段。")
        return field_list
    except (json.JSONDecodeError, KeyError) as e:
        print(f"元数据解析错误: {e}")
        return []


def preprocess_row(row, field_keys, field_list):
    """预处理行数据,将值转换为与字段类型匹配的格式"""
    processed_row = []
    field_types = {f['Key']: f['FieldType'] for f in field_list}
    for i, key in enumerate(field_keys):
        value = row[i] if i < len(row) else None
        field_type = field_types.get(key, 'string')
        if isinstance(value, dict):
            value = value.get('FNumber', value.get('FName', ''))
        elif value is None:
            pass
        elif field_type in ['integer', 'number', 'decimal', 'float']:
            value = float(value) if value not in [None, ''] else None
        elif field_type in ['boolean', 'bool']:
            value = str(value).lower() == 'true'
        elif value == '':
            value = None
        else:
            value = str(value)
        processed_row.append(value)
    return tuple(processed_row)


def insert_data(cur, conn, rows, field_list, table_name, schema):
    """将数据批量插入到数据库"""
    if not rows:
        return 0
    field_keys = [field['Key'] for field in field_list]
    columns = [f'"{key}"' for key in field_keys]  # 使用双引号

    insert_query = f"""
        INSERT INTO {schema}."{table_name}" ({', '.join(columns)}, "created_at")
        VALUES %s
    """
    try:
        processed_rows = [preprocess_row(row, field_keys, field_list) + (datetime.now(),) for row in rows]
        execute_values(cur, insert_query, processed_rows)
        conn.commit()
        return len(processed_rows)
    except psycopg2.Error as e:
        print(f"插入数据失败: {e}")
        conn.rollback()
        return 0


def fetch_and_insert_daily_data(cur, conn, form_id, table_name, field_list, schema, start_date_str):
    """
    核心函数:获取指定单据和单日的所有数据,并直接写入数据库。
    - 动态生成查询字段
    - 自动处理分页。
    - 打印订单号和产品名称日志。
    - 仅在获取到数据时执行写入。
    """
    api_sdk = K3CloudApiSdk("https://XXXXX.ik3cloud.com/k3cloud/")
    api_sdk.Init(config_path='./conf.ini', config_node='config')

    # 从元数据动态生成字段列表和查询字符串
    if not field_list:
        print("错误: 字段列表为空,无法执行查询。")
        return 0

    field_keys = [field['Key'] for field in field_list]
    field_keys_str = ",".join(field_keys)
    # print("fields list:",field_keys_str)

    para = {
        "FormId": form_id,
        "FieldKeys": field_keys_str,
        "FilterString": f"FDate = '{start_date_str}'",
        "OrderString": "", "TopRowCount": 0, "StartRow": 0, "Limit": 2000, "SubSystemId": ""
    }
    print(f"开始查询日期: {start_date_str}")
    all_rows = []
    while True:
        try:
            response = api_sdk.ExecuteBillQuery(para)
            res = json.loads(response)
            print("res",res)
            if not res:
                break
            for row in res:
                # 查找 FBillNo 和 FMaterialDesc 的索引用于日志记录
                order_no = row[0]
                product_name = row[8]
                print(f"  获取到数据 -> 订单号: {order_no}, 时间: {product_name}")
                all_rows.append(row)

            if len(res) < para["Limit"]:
                break

            para["StartRow"] += para["Limit"]
            print(f"  分页查询,下一页起始行: {para['StartRow']}")

        except (json.JSONDecodeError, Exception) as e:
            print(f"查询或解析数据时出错: {e}")
            break

    if not all_rows:
        print(f"日期 {start_date_str} 没有查询到任何数据,跳过写入。")
        return 0

    print(f"日期 {start_date_str} 查询完成,共 {len(all_rows)} 条记录,准备写入数据库...")
    inserted_count = insert_data(cur, conn, all_rows, field_list, table_name, schema)
    print(f"成功将 {inserted_count} 条数据写入表 {table_name}")
    return inserted_count

4、使用元数据优化建表逻辑和排错

比如,我在查询条码数据时提示,

api_sdk.ExecuteBillQuery(para) =  
[[{'Result': 
   {'ResponseStatus': 
      {'ErrorCode': 500, 'IsSuccess': False, 'Errors': 
        [{'FieldName': None, 'Message': '名称为“条码管理”的模块/子系统未购买', 'DIndex': 0}], 'SuccessEntitys': [], 'SuccessMessages': [], 'MsgCode': 7}
}}]]
__BLOCK_p__金蝶云星空的数据库表关联关系类型:

001、单据头拆分表

单据头的拆分表引用单据头所在表的主键

例如:T_AP_PAYABLE_O.FID=T_AP_PAYABLE.FID

002、单据体拆分表

单据体的拆分表引用单据体所在表的主键;同时引用单据体的父实体的主键

例如:T_AP_PAYABLEENTRY_O.Fentryid=T_AP_PAYABLEENTRY.fentryid

例如:T_AP_PAYABLEENTRY_O.fid=T_AP_PAYABLE.fid

003、多语言表

多语言表引用所在实体的表的主键

例如:T_AP_PAYABLEENTRY_L.fentryid=T_AP_PAYABLEENTRY.fentryid

004、单据体

单据体引用单据头的主键

例如:T_AP_PAYABLEENTRY.FID=T_AP_PAYABLE.FID

005、单据头关联凭证表(VH表)

单据头关联凭证表引用单据头的主键

例如:T_AP_PAYABLE_VH.Fid=T_AP_PAYABLE.FID

006、子单据体

子单据体引用单据体的主键

例如:T_AP_PAYABLETAX.fentryid=T_AP_PAYABLEenty.fentryid

作者:夏天的云儿

来源:金蝶云社区

原文链接:https://vip.kingdee.com/article/618381625222396160?productLineId=1&lang=zh-CN

著作权归作者所有。未经允许禁止转载,如需转载请联系作者获得授权。

借助于元数据的结构,可以优化建表逻辑和查询过程。以物料信息为例,json 的 entry 节点中有很多个分类,简化期间,我只保留了前面五个分类信息。

但是要注意,如果保留所有的 Entry 节点,数据可能重复重复,这让我很震惊。

比如,我在查询 User 用户时,如果保留所有元数据条目 (Entries): ['FBillHead', 'FOrgInfo', 'FRoleInfo'],就会出现一个用户对应多个OrgInfo 和多个 RoleInfo 之中。 这个就需要通过设置主键或其他方式避免。

为了帮助检查 Entry 的数量,我在查询时让代码抛出 Entry 节点的名称以备检查,并默认保留前面5个节点。

1 - 正在解析元数据...
  发现的元数据条目 (Entries): ['FBillHead', 'FSubHeadEntity', 'FBarCodeEntity_CMK', 'FSpecialAttributeEntity', 'SubHeadEntity', 'SubHeadEntity1', 'SubHeadEntity2', 'SubHeadEntity3', 'SubHeadEntity4', 'SubHeadEntity5', 'FEntityAuxPty', 'FEntityInvPty', 'SubHeadEntity7', 'SubHeadEntity6']
  将只处理以下条目中的字段: ['FBillHead', 'FSubHeadEntity', 'FBarCodeEntity_CMK', 'FSpecialAttributeEntity', 'SubHeadEntity']
  元数据解析成功,共找到 123 个字段。
[{
	'Key': 'FBillNo',
	'Name': [{
		'Key': 2052,
		'Value': '单据编号'
	}, {
		'Key': 1033,
		'Value': 'Doc No.'
	}, {
		'Key': 3076,
		'Value': '單據編號'
	}],
	'FieldName': 'FBILLNO',
	'PropertyName': 'BillNo',
	'FieldType': 231,
	'EntityKey': 'FBillHead',
	'TableName': 't_STK_InStock',
	'ElementType': 12,
	'MustInput': 0,
	'LookUpObjectFormId': None,
	'LookUpObjectID': None,
	'EnumObjectId': None,
	'Extends': None,
	'ControlFieldKey': None,
	'GroupFieldTableName': None,
	'DefValue': '',
	'IsViewVisible': True,
	'IsEditVisible': True,
	'IsNewVisible': True,
	'IsNewLock': False,
	'IsEditLock': False,
	'Editlen': 30,
	'ConditionType': '0'
}, {
	'Key': 'FDocumentStatus',
	'Name': [{
		'Key': 2052,
		'Value': '单据状态'
	}, {
		'Key': 1033,
		'Value': 'Doc Status'
	}, {
		'Key': 3076,
		'Value': '單據狀態'
	}],
	'FieldName': 'FDOCUMENTSTATUS',
	'PropertyName': 'DocumentStatus',
	'FieldType': 175,
	'EntityKey': 'FBillHead',
	'TableName': 't_STK_InStock',
	'ElementType': 40,
	'MustInput': 0,
	'LookUpObjectFormId': None,
	'LookUpObjectID': None,
	'EnumObjectId': None,
	'Extends': [{
		'Value': 'Z',
		'Caption': '暂存',
		'Seq': 0,
		'Invalid': False
	}, {
		'Value': 'A',
		'Caption': '创建',
		'Seq': 2,
		'Invalid': False
	}, {
		'Value': 'B',
		'Caption': '审核中',
		'Seq': 3,
		'Invalid': False
	}, {
		'Value': 'C',
		'Caption': '已审核',
		'Seq': 4,
		'Invalid': False
	}, {
		'Value': 'D',
		'Caption': '重新审核',
		'Seq': 5,
		'Invalid': False
	}],
	'ControlFieldKey': None,
	'GroupFieldTableName': None,
	'DefValue': 'Z',
	'IsViewVisible': True,
	'IsEditVisible': True,
	'IsNewVisible': True,
	'IsNewLock': True,
	'IsEditLock': True,
	'Editlen': 0,
	'ConditionType': '9'
}, {
	'Key': 'FStockOrgId',
	'Name': [{
		'Key': 2052,
		'Value': '收料组织'
	}, {
		'Key': 1033,
		'Value': 'Receipt Org.'
	}, {
		'Key': 3076,
		'Value': '收料組織'
	}],
	'FieldName': 'FSTOCKORGID',
	'PropertyName': 'StockOrgId',
	'FieldType': 127,
	'EntityKey': 'FBillHead',
	'TableName': 't_STK_InStock',
	'ElementType': 7,
	'MustInput': 1,
	'LookUpObjectFormId': 'ORG_Organizations',
	'LookUpObjectID': '3f8da3f4-fe84-4456-9548-7c2c38b7c57f',
	'EnumObjectId': None,
	'Extends': None,
	'ControlFieldKey': None,
	'GroupFieldTableName': None,
	'DefValue': '',
	'IsViewVisible': True,
	'IsEditVisible': True,
	'IsNewVisible': True,
	'IsNewLock': False,
	'IsEditLock': False,
	'Editlen': 0,
	'ConditionType': '0,20'
}, {
	'Key': 'FDate',
	'Name': [{
		'Key': 2052,
		'Value': '入库日期'
	}, {
		'Key': 1033,
		'Value': 'Warehouse Receipt Date'
	}, {
		'Key': 3076,
		'Value': '入庫日期'
	}],
	'FieldName': 'FDATE',
	'PropertyName': 'Date',
	'FieldType': 61,
	'EntityKey': 'FBillHead',
	'TableName': 't_STK_InStock',
	'ElementType': 4,
	'MustInput': 1,
	'LookUpObjectFormId': None,
	'LookUpObjectID': None,
	'EnumObjectId': None,
	'Extends': None,
	'ControlFieldKey': None,
	'GroupFieldTableName': None,
	'DefValue': None,
	'IsViewVisible': True,
	'IsEditVisible': True,
	'IsNewVisible': True,
	'IsNewLock': False,
	'IsEditLock': False,
	'Editlen': 0,
	'ConditionType': '2'
}, 
…………………………
__BLOCK_p__作为单一主键的数据库数据类型有以下6种:1、smallintsystem_type_id=52 举例:T_BAS_NUMBER.FID2、intsystem_type_id=56 举例:T_BAS_ITEM.FITEMID3、bigintsystem_type_id=127 举例:T_SAL_INITOUTSTOCK.FID4、varcharsystem_type_id=167 举例:T_BAS_PUBNEEDS.FID5、charsystem_type_id=175 举例:CMK_BD_KTMDELETELOG.FID6、nvarcharsystem_type_id=231 举例: T_AM_NETCONTROL.FID作者:i求知若渴来源:金蝶云社区原文链接:https://vip.kingdee.com/article/61129193165550592?productLineId=1&lang=zh-CN著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

三、企业级 DW 中心开发

当然,上面是我个人用 Python 完成的测试,最终开发使用了更加健壮的 java 完成。并结合定时任务实现高效的增量刷新。

A snapshot of system logs showing request URLs and timestamps related to an API execution.

目前,我用 java 完成了采购及部分主题的数据表查询,这样就建立了后续 BI 的分析基础,并且尽可能不影响业务系统。

补充:我的测试数据一览

A screenshot showing database assets, including a table list, schema information, and data statistics from Kingdee cloud, utilizing API for querying and data storage.

四、最精彩的部分:Tableau 分析

分析始于建模,建模的典型是关系模型(relationship)。

如图所示,展示了采购订单和供应商信息的基本模型,后续可以增加物料等基本信息,构成典型的单事实多维度模型。

图示展示了采购订单与物料明细的关系模型,左侧为连接信息,右侧为物料详细数据。

模型之后就是仪表板啦,让大家看看近期的作品(列表)吧。

Interface of Tableau DW/BI analysis system showing various dashboards and reports.

注:客户数据不便公开。

2025/09/30 update by 喜乐君

No comments yet