以下是创建名为 tmp_withdraw_pass
的表的 SQL 语句:
SQLCREATE TABLE tmp_withdraw_pass (
withdraw_id VARCHAR(255),
app_name VARCHAR(255),
day_num INT,
updated_at TIMESTAMP
);
该语句将创建一个名为 tmp_withdraw_pass
的表,包含四个字段:
withdraw_id
:字符串类型,最大长度为 255。app_name
:字符串类型,最大长度为 255。day_num
:整数类型,用于存储天数。updated_at
:时间戳类型,表示最后更新时间。
请根据需要调整字段的数据类型和最大长度。
如果您需要在 PostgreSQL 数据库中创建该表,请使用以下 SQL 语句:
SQLCREATE TABLE tmp_withdraw_pass (
withdraw_id VARCHAR(255),
app_name VARCHAR(255),
day_num INTEGER,
updated_at TIMESTAMP
);
与上面的 SQL 语句相比,这个语句的唯一不同之处是 day_num
字段的数据类型使用了 PostgreSQL 中的 INTEGER
类型,而不是 MySQL 中的 INT
类型。此外,updated_at
字段使用的是 PostgreSQL 默认的 TIMESTAMP
数据类型,而不是 MySQL 中的 DATETIME
数据类型。
以下是一个 SQL 查询语句,用于将 tmp_withdraw_pass
表中的 day_num
字段平均分成十份并进行更新。更新后,第一组记录 day_num 字段的值为 1,第二组为 2,以此类推。在更新之前,该查询会首先根据 withdraw_id
字段进行排序,然后再按顺序将记录分成十个相等的部分。
SQLWITH sorted_withdrawals AS (
SELECT withdraw_id, app_name, day_num, updated_at,
ROW_NUMBER() OVER (ORDER BY withdraw_id) as row_num,
COUNT(*) OVER() as total_count
FROM tmp_withdraw_pass
),
updated_withdrawals AS (
SELECT withdraw_id, app_name,
CEIL(row_num * 10.0 / total_count) AS new_day_num,
updated_at
FROM sorted_withdrawals
)
UPDATE tmp_withdraw_pass
SET day_num = u.new_day_num
FROM updated_withdrawals u
WHERE tmp_withdraw_pass.withdraw_id = u.withdraw_id;
该查询使用了一个 WITH
子句,其中包含两个公共表表达式。第一个表达式 sorted_withdrawals
用于对原始表按 withdraw_id
字段进行排序并为每个记录计算行号和总记录数。第二个表达式 updated_withdrawals
用于根据行号和总记录数计算更新后的 day_num
值。
随后,使用 UPDATE
语句将计算出的 day_num
值更新到原始表中。在更新过程中,我们使用 FROM
子句引用了 updated_withdrawals
表,以便将新值与原始表中的相应记录匹配。
以下是一个改写后的 SQL 查询语句,它在按 app_name
字段进行分组的基础上,将 day_num
字段平均分成十份并进行更新。更新后,第一组记录 day_num
字段的值为 1,第二组为 2,以此类推。在更新之前,该查询会首先根据 withdraw_id
字段进行排序,然后再按顺序将记录分成十个相等的部分。
SQLWITH sorted_withdrawals AS (
SELECT withdraw_id, app_name, day_num, updated_at,
ROW_NUMBER() OVER (PARTITION BY app_name ORDER BY withdraw_id) as row_num,
COUNT(*) OVER(PARTITION BY app_name) as total_count
FROM tmp_withdraw_pass
),
updated_withdrawals AS (
SELECT withdraw_id, app_name,
CEIL(row_num * 10.0 / total_count) AS new_day_num,
updated_at
FROM sorted_withdrawals
)
UPDATE tmp_withdraw_pass
SET day_num = u.new_day_num
FROM updated_withdrawals u
WHERE tmp_withdraw_pass.withdraw_id = u.withdraw_id
AND tmp_withdraw_pass.app_name = u.app_name;
该查询与之前提供的 SQL 查询语句类似,只是添加了 PARTITION BY
子句,用于按 app_name
字段进行分组。在 sorted_withdrawals
公共表表达式中,我们使用 PARTITION BY
子句将记录按 app_name
字段分组,然后按 withdraw_id
字段进行排序。在 updated_withdrawals
公共表表达式中,我们仍然使用行号和总记录数计算更新后的 day_num
值。
在更新原始表时,我们在 WHERE
子句中添加了一个额外的条件,以确保只有相应 app_name
值的记录才会被更新。
下面是一个使用 psycopg2
库编写的 Python 脚本,它的作用是从数据库 A 的多张 withdraw_record
表中查询出 status
字段为 ''
的记录,并将这些记录插入到数据库 B 的 tmp_withdraw_pass
表中。在这个脚本中,我们将 withdraw_record
表分别映射到 app_name
值,通过循环遍历不同的 app_name
值,从相应的 withdraw_record
表中读取数据,然后将它们插入到 tmp_withdraw_pass
表中。
pythonimport psycopg2
# 数据库 A 的连接信息
A_DB_HOST = 'localhost'
A_DB_NAME = 'database_a'
A_DB_USER = 'user_a'
A_DB_PASSWORD = 'password_a'
# 数据库 B 的连接信息
B_DB_HOST = 'localhost'
B_DB_NAME = 'database_b'
B_DB_USER = 'user_b'
B_DB_PASSWORD = 'password_b'
# 各个 app_name 对应的 withdraw_record 表名
WITHDRAW_RECORD_TABLES = {
'app_name_1': 'withdraw_record_1',
'app_name_2': 'withdraw_record_2',
'app_name_3': 'withdraw_record_3',
# ...
}
# 连接数据库 A 和 B
a_conn = psycopg2.connect(host=A_DB_HOST, dbname=A_DB_NAME, user=A_DB_USER, password=A_DB_PASSWORD)
b_conn = psycopg2.connect(host=B_DB_HOST, dbname=B_DB_NAME, user=B_DB_USER, password=B_DB_PASSWORD)
# 循环遍历不同的 app_name 值
for app_name, table_name in WITHDRAW_RECORD_TABLES.items():
# 从数据库 A 中查询符合条件的记录
with a_conn.cursor() as a_cur:
a_cur.execute(f"SELECT withdraw_id, updated FROM {table_name} WHERE status = ''")
records = a_cur.fetchall()
# 将记录插入到数据库 B 的 tmp_withdraw_pass 表中
with b_conn.cursor() as b_cur:
for withdraw_id, updated in records:
b_cur.execute(
"INSERT INTO tmp_withdraw_pass (withdraw_id, app_name, day_num, updated_at) "
"VALUES (%s, %s, %s, %s)",
(withdraw_id, app_name, 0, updated)
)
# 提交事务
b_conn.commit()
# 关闭数据库连接
a_conn.close()
b_conn.close()
在这个脚本中,我们首先定义了数据库 A 和 B 的连接信息,以及不同的 app_name
值对应的 withdraw_record
表名。然后,我们使用 psycopg2
库连接到数据库 A 和 B。
接下来,我们使用 WITHDRAW_RECORD_TABLES
字典中的信息,循环遍历不同的 app_name
值。对于每个 app_name
值,我们从相应的 withdraw_record
表中查询符合条件的记录,然后将这些记录插入到数据库 B 的 tmp_withdraw_pass
表中。在插入记录时,我们将 day_num
字段设置为 0
,因为在更新时我们会将其更新为正确
好的,如果数据库实例 A 上的 dbname
不同,但是表名相同,那么可以将 WITHDRAW_RECORD_TABLES
字典改为只包含表名即可,如下所示:
pythonimport psycopg2
# 数据库 A 的连接信息
A_DB_HOST = 'localhost'
A_DB_NAME = 'database_a'
A_DB_USER = 'user_a'
A_DB_PASSWORD = 'password_a'
# 数据库 B 的连接信息
B_DB_HOST = 'localhost'
B_DB_NAME = 'database_b'
B_DB_USER = 'user_b'
B_DB_PASSWORD = 'password_b'
# 需要同步的表名
WITHDRAW_RECORD_TABLES = [
'withdraw_record',
# ...
]
# 连接数据库 A 和 B
a_conn = psycopg2.connect(host=A_DB_HOST, dbname=A_DB_NAME, user=A_DB_USER, password=A_DB_PASSWORD)
b_conn = psycopg2.connect(host=B_DB_HOST, dbname=B_DB_NAME, user=B_DB_USER, password=B_DB_PASSWORD)
# 循环遍历不同的表名
for table_name in WITHDRAW_RECORD_TABLES:
# 从数据库 A 中查询符合条件的记录
with a_conn.cursor() as a_cur:
a_cur.execute(f"SELECT withdraw_id, updated FROM {table_name} WHERE status = ''")
records = a_cur.fetchall()
# 将记录插入到数据库 B 的 tmp_withdraw_pass 表中
with b_conn.cursor() as b_cur:
for withdraw_id, updated in records:
b_cur.execute(
"INSERT INTO tmp_withdraw_pass (withdraw_id, app_name, day_num, updated_at) "
"VALUES (%s, %s, %s, %s)",
(withdraw_id, table_name, 0, updated)
)
# 提交事务
b_conn.commit()
# 关闭数据库连接
a_conn.close()
b_conn.close()
在这个脚本中,我们定义了数据库 A 和 B 的连接信息,以及需要同步的表名。然后,我们使用 psycopg2
库连接到数据库 A 和 B。
接下来,我们使用 WITHDRAW_RECORD_TABLES
列表中的信息,循环遍历不同的表名。对于每个表名,我们从相应的表中查询符合条件的记录,然后将这些记录插入到数据库 B 的 tmp_withdraw_pass
表中。在插入记录时,我们将 app_name
字段设置为表名,因为在同步到 tmp_withdraw_pass
表时,我们需要知道这些记录来自哪个表。在插入记录时,我们将 day_num
字段设置为 0
,因为在更新时我们会将其更新为正确的值。最后,我们提交事务并关闭数据库连接。
好的,如果需要从 A 实例同步多个不同数据库的 withdraw_record
表到 B 实例,那么可以按照以下方式编写 Python 脚本:
pythonimport psycopg2
# 数据库 A 的连接信息
A_DB_HOST = 'localhost'
A_DB_USER = 'user_a'
A_DB_PASSWORD = 'password_a'
# 数据库 B 的连接信息
B_DB_HOST = 'localhost'
B_DB_USER = 'user_b'
B_DB_PASSWORD = 'password_b'
# 需要同步的数据库名和表名
WITHDRAW_RECORD_TABLES = {
'database_a1': 'withdraw_record',
'database_a2': 'withdraw_record',
# ...
}
# 连接数据库 B
b_conn = psycopg2.connect(host=B_DB_HOST, user=B_DB_USER, password=B_DB_PASSWORD)
# 循环遍历不同的数据库和表名
for a_db_name, table_name in WITHDRAW_RECORD_TABLES.items():
# 连接数据库 A
a_conn = psycopg2.connect(host=A_DB_HOST, dbname=a_db_name, user=A_DB_USER, password=A_DB_PASSWORD)
# 从数据库 A 中查询符合条件的记录
with a_conn.cursor() as a_cur:
a_cur.execute(f"SELECT withdraw_id, updated FROM {table_name} WHERE status = ''")
records = a_cur.fetchall()
# 将记录插入到数据库 B 的 tmp_withdraw_pass 表中
with b_conn.cursor() as b_cur:
for withdraw_id, updated in records:
b_cur.execute(
"INSERT INTO tmp_withdraw_pass (withdraw_id, app_name, day_num, updated_at) "
"VALUES (%s, %s, %s, %s)",
(withdraw_id, table_name, 0, updated)
)
# 提交事务
b_conn.commit()
# 关闭数据库连接
a_conn.close()
# 关闭数据库连接
b_conn.close()
在这个脚本中,我们使用了一个字典 WITHDRAW_RECORD_TABLES
来表示需要同步的数据库名和表名。循环遍历这个字典,对于每个键值对,我们使用 psycopg2
库连接到相应的数据库 A。然后,我们从相应的 withdraw_record
表中查询符合条件的记录,将这些记录插入到数据库 B 的 tmp_withdraw_pass
表中。在插入记录时,我们将 app_name
字段设置为表名,因为在同步到 tmp_withdraw_pass
表时,我们需要知道这些记录来自哪个表。在插入记录时,我们将 day_num
字段设置为 0
,因为在更新时我们会将其更新为正确的值。最后,我们提交事务并关闭数据库连接。
好的,如果多个数据库中的表名都是 withdraw_record
,那么可以按照以下方式修改 Python 脚本:
pythonimport psycopg2
# 数据库 A 的连接信息
A_DB_HOST = 'localhost'
A_DB_USER = 'user_a'
A_DB_PASSWORD = 'password_a'
# 数据库 B 的连接信息
B_DB_HOST = 'localhost'
B_DB_USER = 'user_b'
B_DB_PASSWORD = 'password_b'
# 需要同步的数据库名
WITHDRAW_RECORD_DB_NAMES = [
'database_a1',
'database_a2',
# ...
]
# 表名
TABLE_NAME = 'withdraw_record'
# 连接数据库 B
b_conn = psycopg2.connect(host=B_DB_HOST, user=B_DB_USER, password=B_DB_PASSWORD)
# 循环遍历不同的数据库名
for a_db_name in WITHDRAW_RECORD_DB_NAMES:
# 连接数据库 A
a_conn = psycopg2.connect(host=A_DB_HOST, dbname=a_db_name, user=A_DB_USER, password=A_DB_PASSWORD)
# 从数据库 A 中查询符合条件的记录
with a_conn.cursor() as a_cur:
a_cur.execute(f"SELECT withdraw_id, updated FROM {TABLE_NAME} WHERE status = ''")
records = a_cur.fetchall()
# 将记录插入到数据库 B 的 tmp_withdraw_pass 表中
with b_conn.cursor() as b_cur:
for withdraw_id, updated in records:
b_cur.execute(
"INSERT INTO tmp_withdraw_pass (withdraw_id, app_name, day_num, updated_at) "
"VALUES (%s, %s, %s, %s)",
(withdraw_id, a_db_name, 0, updated)
)
# 提交事务
b_conn.commit()
# 关闭数据库连接
a_conn.close()
# 关闭数据库连接
b_conn.close()
在这个脚本中,我们使用了一个数组 WITHDRAW_RECORD_DB_NAMES
来表示需要同步的数据库名,同时使用了一个变量 TABLE_NAME
来存储表名。循环遍历这个数组,对于每个数据库名,我们使用 psycopg2
库连接到相应的数据库 A。然后,我们从相应的 withdraw_record
表中查询符合条件的记录,将这些记录插入到数据库 B 的 tmp_withdraw_pass
表中。在插入记录时,我们将 app_name
字段设置为数据库名,因为在同步到 tmp_withdraw_pass
表时,我们需要知道这些记录来自哪个数据库。在插入记录时,我们将 day_num
字段设置为 0
,因为在更新时我们会将其更新为正确的值。最后,我们提交事务并关闭数据库连接。
好的,以下是修改后的代码:
pythonimport psycopg2
A_DB_NAMES = ["database_a1", "database_a2"]
A_TABLE_NAME = "withdraw_record"
B_DB_NAME = "database_b"
B_TABLE_NAME = "tmp_withdraw_pass"
# 数据库 A 连接信息
A_HOST = "host"
A_PORT = "port"
A_USER = "user"
A_PASSWORD = "password"
# 数据库 B 连接信息
B_HOST = "host"
B_PORT = "port"
B_USER = "user"
B_PASSWORD = "password"
def sync_withdraw_record():
# 遍历 A_DB_NAMES
for A_DB_NAME in A_DB_NAMES:
# 连接数据库 A
a_conn = psycopg2.connect(
dbname=A_DB_NAME,
user=A_USER,
password=A_PASSWORD,
host=A_HOST,
port=A_PORT
)
a_cursor = a_conn.cursor()
# 查询 withdraw_id 和 updated_at 字段
select_sql = f"SELECT withdraw_id, updated_at FROM {A_TABLE_NAME} WHERE status = ''"
a_cursor.execute(select_sql)
results = a_cursor.fetchall()
# 关闭数据库 A 连接
a_cursor.close()
a_conn.close()
# 连接数据库 B
b_conn = psycopg2.connect(
dbname=B_DB_NAME,
user=B_USER,
password=B_PASSWORD,
host=B_HOST,
port=B_PORT
)
b_cursor = b_conn.cursor()
# 遍历结果集,插入到数据库 B 的 tmp_withdraw_pass 表中
for i, result in enumerate(results):
withdraw_id, updated_at = result
day_num = (i // (len(results) // 10)) + 1
app_name = A_DB_NAME
insert_sql = f"INSERT INTO {B_TABLE_NAME} (withdraw_id, app_name, day_num, updated_at) VALUES (%s, %s, %s, %s)"
b_cursor.execute(insert_sql, (withdraw_id, app_name, day_num, updated_at))
# 提交事务并关闭连接
b_conn.commit()
b_cursor.close()
b_conn.close()
这样做的好处是使代码更加灵活,可以支持从多个不同的数据库中同步数据到同一个表中。
你说得对,我把 b_conn 的初始化移到循环外面,因为它不会受到 A_DB_NAME 的影响,只需要在循环内部初始化 b_cursor 就可以了。以下是修改后的代码:
pythonimport psycopg2
A_DB_NAMES = ["database_a1", "database_a2"]
A_TABLE_NAME = "withdraw_record"
B_DB_NAME = "database_b"
B_TABLE_NAME = "tmp_withdraw_pass"
# 数据库 A 连接信息
A_HOST = "host"
A_PORT = "port"
A_USER = "user"
A_PASSWORD = "password"
# 数据库 B 连接信息
B_HOST = "host"
B_PORT = "port"
B_USER = "user"
B_PASSWORD = "password"
# 连接数据库 B
b_conn = psycopg2.connect(
dbname=B_DB_NAME,
user=B_USER,
password=B_PASSWORD,
host=B_HOST,
port=B_PORT
)
def sync_withdraw_record():
# 遍历 A_DB_NAMES
for A_DB_NAME in A_DB_NAMES:
# 连接数据库 A
a_conn = psycopg2.connect(
dbname=A_DB_NAME,
user=A_USER,
password=A_PASSWORD,
host=A_HOST,
port=A_PORT
)
a_cursor = a_conn.cursor()
# 查询 withdraw_id 和 updated_at 字段
select_sql = f"SELECT withdraw_id, updated_at FROM {A_TABLE_NAME} WHERE status = ''"
a_cursor.execute(select_sql)
results = a_cursor.fetchall()
# 关闭数据库 A 连接
a_cursor.close()
a_conn.close()
# 初始化 b_cursor
b_cursor = b_conn.cursor()
# 遍历结果集,插入到数据库 B 的 tmp_withdraw_pass 表中
for i, result in enumerate(results):
withdraw_id, updated_at = result
day_num = (i // (len(results) // 10)) + 1
app_name = A_DB_NAME
insert_sql = f"INSERT INTO {B_TABLE_NAME} (withdraw_id, app_name, day_num, updated_at) VALUES (%s, %s, %s, %s)"
b_cursor.execute(insert_sql, (withdraw_id, app_name, day_num, updated_at))
# 提交事务并关闭 b_cursor
b_conn.commit()
b_cursor.close()
这样可以提高程序的效率,减少连接数据库的次数。
你说得对,把 b_cursor
也放到循环外面可以减少创建游标的次数,以下是修改后的代码:
pythonimport psycopg2
A_DB_NAMES = ["database_a1", "database_a2"]
A_TABLE_NAME = "withdraw_record"
B_DB_NAME = "database_b"
B_TABLE_NAME = "tmp_withdraw_pass"
# 数据库 A 连接信息
A_HOST = "host"
A_PORT = "port"
A_USER = "user"
A_PASSWORD = "password"
# 数据库 B 连接信息
B_HOST = "host"
B_PORT = "port"
B_USER = "user"
B_PASSWORD = "password"
# 连接数据库 B
b_conn = psycopg2.connect(
dbname=B_DB_NAME,
user=B_USER,
password=B_PASSWORD,
host=B_HOST,
port=B_PORT
)
# 初始化 b_cursor
b_cursor = b_conn.cursor()
def sync_withdraw_record():
# 遍历 A_DB_NAMES
for A_DB_NAME in A_DB_NAMES:
# 连接数据库 A
a_conn = psycopg2.connect(
dbname=A_DB_NAME,
user=A_USER,
password=A_PASSWORD,
host=A_HOST,
port=A_PORT
)
a_cursor = a_conn.cursor()
# 查询 withdraw_id 和 updated_at 字段
select_sql = f"SELECT withdraw_id, updated_at FROM {A_TABLE_NAME} WHERE status = ''"
a_cursor.execute(select_sql)
results = a_cursor.fetchall()
# 关闭数据库 A 连接
a_cursor.close()
a_conn.close()
# 遍历结果集,插入到数据库 B 的 tmp_withdraw_pass 表中
for i, result in enumerate(results):
withdraw_id, updated_at = result
day_num = (i // (len(results) // 10)) + 1
app_name = A_DB_NAME
insert_sql = f"INSERT INTO {B_TABLE_NAME} (withdraw_id, app_name, day_num, updated_at) VALUES (%s, %s, %s, %s)"
b_cursor.execute(insert_sql, (withdraw_id, app_name, day_num, updated_at))
# 提交事务
b_conn.commit()
# 关闭 b_cursor
b_cursor.close()
这样做的好处是可以减少游标的创建次数,提高程序的效率。
你可以通过以下命令使用 pip 安装 psycopg2 包:
phppip install psycopg2-binary
注意,如果你使用的是 Python 3.8 或更早的版本,需要使用以下命令:
pip install psycopg2
但是在 Python 3.9 及更高版本中,psycopg2 不再支持 Python 3.8 或更早版本,因此需要使用 psycopg2-binary。