模拟审计日志数据

本文包括两大部分内容:1)对openGauss审计日志的调研,2)模拟一个简单教务系统的多用户审计日志

审计日志调研

日志位置

通过下面的语句可以查询到各种配置文件的路径
例如postgresql.conf文件路径:/var/lib/opengauss/data/postgresql.conf
select name,setting from pg_settings where category=’File Locations’;
20231122143940

查看审计日志的文件夹名:
SHOW audit_directory;
20231122143959

审计日志完整路径:/var/lib/opengauss/data/pg_audit
20231122144017

比较OG和PG 中的日志管理

| openGauss | pg_log
数据库运行日志,openGauss运行时数据库节点以及openGauss安装部署时产生的日志统称为系统日日志
pg_xlog
WAL 日志,事务日志信息即重做日志
pg_clog
事务提交日志,记录的是事务的元数据
pg_audit
审计功能开启时会不断产生大量的审计日志,占用磁盘空间。用户可以根据磁盘空间的大小设置审计日志维护策略。
gs_profile

性能日志指的是数据库系统在运行时检测物理资源的运行状态的日志,在对外部资源进行访问时的性能检测,包括磁盘等外部资源的访问检测信息。 |
| — | — |
| postgresql | pg_log
pg_xlog
pg_clog |

PostgreSQL日志管理-CSDN博客 日志参考

审计日志

openGauss将用户对数据库的所有操作写入审计日志。数据库安全管理员可以利用这些日志信息,重现导致数据库现状的一系列事件,找出非法操作的用户、时间和内容等。
默认时开启审计功能的audit_enabled,除了审计总开关,各个审计项也有对应的开关。只有开关开启,对应的审计功能才能生效。审计概述

审计相关的配置

下面列出了所有审计配置,其中就包括对各个审计项的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#------------------------------------------------------------------------------
# AUDIT
#------------------------------------------------------------------------------

audit_enabled = on
audit_directory = 'pg_audit' # 审计文件的存储目录
audit_data_format = 'binary' # 审计日志文件的格式,当前仅支持二进制格式(binary)
audit_rotation_interval = 1d # 创建一个新审计日志文件的时间间隔,default 1 day
audit_rotation_size = 10MB # 审计日志文件的最大容量, 超过此参数值时将生成一个新日志
audit_space_limit = 1024MB # 审计文件占用磁盘空间的最大值
audit_file_remain_threshold = 1048576 # 审计目录下审计文件的最大数量

# --------具体的审计项--------
# 1, 7 表示开启,0表示关闭

#---用户和权限审计
audit_login_logout = 7 # 用户登录、退出的审计
audit_database_process = 1 # 数据库启动、停止、恢复和切换的审计
audit_user_locked = 1 # 用户锁定和解锁
audit_user_violation = 0 # 用户越权操作审计
audit_grant_revoke = 1 # 用户权限授予和回收审计

#---各类操作审计
# 数据库对象的CREATE,ALTER,DROP操作审计,
# 由29个二进制位数表示,分别代表了26类数据库对象
# 12295表示:审计DATABASE, SCHEMA, USER, DATA SOURCE
audit_system_object = 12295

# 具体表的INSERT、UPDATE和DELETE操作审计, 默认关闭
# 注意不包括SELECT
audit_dml_state = 0
# 是否对SELECT操作进行审计
audit_dml_state_select = 0

# 在执行存储过程、匿名块或自定义函数(不包括系统自带函数)时是否记录审计信息
audit_function_exec = 0
# 是否对COPY操作进行审计
audit_copy_exec = 0
# 是否对SET操作进行审计
audit_set_parameter = 1
#------------------------------------------------------------------------------

配置审计项

这里我们在默认值的基础上,打开对表操作的审计,执行存储过程、函数的审计,copy操作的审计

1
2
3
4
5
6
7
8
9
10
11
12
13
# 12295表示:审计DATABASE, SCHEMA, USER, DATA SOURCE
# 12303表示打开第4位的审计,即对表格创建审计
audit_system_object = 12303

# 具体表的INSERT、UPDATE和DELETE操作审计
audit_dml_state = 0
# 是否对SELECT操作进行审计
audit_dml_state_select = 0

# 在执行存储过程、匿名块或自定义函数(不包括系统自带函数)时是否记录审计信息
audit_function_exec = 0
# 是否对COPY操作进行审计
audit_copy_exec = 0
1
2
3
4
5
6
7
8
9
10
11
12
# 检查audit_dml_state配置值
cat /var/lib/opengauss/data/postgresql.conf | grep audit_dml_state

# 修改配置
gs_guc set -D /var/lib/opengauss/data -c "audit_dml_state=1"
gs_guc set -D /var/lib/opengauss/data -c "audit_dml_state_select=1"
gs_guc set -D /var/lib/opengauss/data -c "audit_function_exec=1"
gs_guc set -D /var/lib/opengauss/data -c "audit_copy_exec=1"
gs_guc set -D /var/lib/opengauss/data -c "audit_system_object=12303"

# 重启数据库使参数生效
gs_ctl restart -D /var/lib/opengauss/data

我们也可以直接修改postgresql.conf文件来配置,修改后也需要重启数据库。

查询审计日志

通过下面的查询语句,可以查询到指定时间段内的所有审计日志。

1
openGauss=# select * from pg_query_audit('2023-05-18 00:00:00','2025-05-18 08:00:00');
1
2
3
4
          time          |      type      | result | userid | username  | database  |     client_conninfo     |    object_name    |	detail_info	|     node_name     |            thread_id            | local_port | remote_port
------------------------+----------------+--------+--------+-----------+-----------+-------------------------+-------------------+--------------+-------------------+---------------------------------+------------+-------------
2021-03-04 08:00:08+08 | login_success | ok | 10 | omm | postgres | gsql@::1 | postgres | login db(postgres) success, SSL=off | dn_6001_6002_6003 | 140477687527168@668131208211425 |17778 | 46946

20231122144049

审计日志存储到表格

在前面我们可以通过查询语句查看日志,但是仍不方便。我们可以使用下面的语句将查询到的日志信息存储到表格中,方便我们观察、过滤日志信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CREATE TABLE audit_log
(
log_time timestamp(3) with time zone,
type text,
result text,

userid integer,
username text,
database text,
client_conninfo text,
object_name text,
detail_info text,
node_name text,

thread_id text,
local_port text,
remote_port text,

);

SELECT * INTO audit_log FROM pg_query_audit('2023-05-18 00:00:00','2025-05-18 08:00:00');

--INSERT INTO pm.audit_log SELECT * FROM pg_query_audit('2023-05-18 00:00:00','2025-05-18 08:00:00');

20231122144108

模拟审计日志

我们假设需要模拟一个多数据库用户场景下的教务管理系统的审计日志,分为下面几个步骤:设计教务管理数据库,创建多用户,创建数据库,使用python脚本模拟各个用户的操作。
20231122144118

数据库设计

db_school设计

数据库设计_用户操作模型.drawio.png

public模式设计

这个教务管理系统中由4个表组成:
教师表(工号,姓名)
学生表(学号,姓名,班级)
课程表(课程号,授课老师工号,课程名,学分)
成绩表(学号,课程号,成绩)
TODO:修改ER图,score表的键应该是student_id+course_id
20231122145346

创建多用户

多用户设计

opengauss:超级管理员,创建用户pm

  • pm:项目架构师;负责设计、创建数据库、表格、视图;创建下面的用户
    • t_coder:老师端,主要对成绩表增、改
    • s_coder:学生端,成绩查询
    • affair:教务处,负责增删学生表、老师表、课程表

各个用户的权限设计

openGauss支持以下的权限:SELECT、INSERT、UPDATE、DELETE、TRUNCATE、REFERENCES、CREATE、CONNECT、EXECUTE、USAGE、ALTER、DROP、COMMENT、INDEX和VACUUM。不同的权限与不同的对象类型关联。
对象所有者的权限(例如ALTER、 DROP、COMMENT、INDEX、VACUUM、GRANT和REVOKE)是隐式拥有的,即只要拥有对象就可以执行对象所有者的这些隐式权限。用户权限设置

pm sysadmin
t_coder SELECT ON ALL TABLES IN SCHEMA public
s_coder SELECT ON ALL TABLES IN SCHEMA public
UPDATE ON public.score
affair SELECT ON ALL TABLES IN SCHEMA public
UPDATE ON ALL TABLES IN SCHEMA public
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
--查看用户列表
SELECT * FROM pg_user;
--查看用户属性
SELECT * FROM pg_authid;

--opengauss创建pm为系统管理员
CREATE USER pm WITH PASSWORD 'xxxxxx';
ALTER USER pm sysadmin;

--pm创建其余用户
gsql -d db_school -U pm -p 7654 -W xxxxxx;
CREATE USER t_coder WITH PASSWORD 'xxxxxx';
CREATE USER s_coder WITH PASSWORD 'xxxxxx';
CREATE USER affair WITH PASSWORD 'xxxxxx';

具体对模式的访问权限设置,我们在下面创建好表格后再为用户分配权限。

创建数据库

使用pm用户创建数据库

1
2
3
4
5
6
7
8
9
10
-- 查看所有数据库
\l

-- 创建数据库
CREATE DATABASE db_school;
-- 修改编码
update pg_database set encoding = pg_char_to_encoding('UTF8') where datname = 'db_school';

-- 使用新建的数据库
\c db_school

在public下建表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
-- 2023-06-28 21:08:09.499 +0800
create table public.teacher
(
teacher_id integer primary key,
name text
);


create table public.student
(
student_id integer primary key,
name text,
class text
);

create table public.course
(
course_id integer primary key,
teacher_id integer references teacher(teacher_id),
cname text,
point float
);

create table public.score
(
student_id integer references student(student_id),
course_id integer references course(course_id),
score float,
primary key(student_id, course_id)
);

给用户赋予public模式权限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
--查看当前数据库下的所有schema
\dn+

--给用户授予public模式的使用权 TODO
--所有用户具有public模式查询的权限
GRANT SELECT ON ALL TABLES IN SCHEMA public to s_coder, t_coder, affair;
--t_coder具有插入,更新score表的权限
GRANT INSERT,UPDATE ON public.score to t_coder;
--affair具有public模式下所有权限
GRANT INSERT,UPDATE ON ALL TABLES IN SCHEMA public to affair;

--切换到s_coder用户
\c - s_coder
--查看当前模式可访问的表
\dt
--设置搜索路径
set search_path=public;
--再次查看可访问的表
\dt

当没有给用户赋予访问schema的权限时,是无法操作表的:
20231122145419

添加了查询权限之后,使用命令 \dt 可以看到当前可以访问的表了。
20231122145433

模拟用户操作

准备40名学生、5名老师、5门课程信息
模拟5天操作的数据
20231122145856

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import psycopg2 as pg
import random

# 获取数据库连接对象
def get_conn(user='test_1'):
conn = pg.connect(database="test_school",
user=user,
password="123456twG",
host="127.0.0.1", port="7654")
return conn

# 从tsv文件导入数据
def insert_tsv(conn, table_name, tsv_path, sep='\t'):
print("Opened database successfully")
cur = conn.cursor()

copy_query = f"COPY public.{table_name} FROM STDIN WITH (FORMAT CSV, HEADER TRUE, DELIMITER E'\\t', ENCODING 'UTF-8')"
with open(tsv_path, 'r', encoding='utf-8') as file:
cur.copy_expert(copy_query, file)
print(f"Copy {tsv_path} ok")
# 提交事务
conn.commit()
# 关闭数据库连接
cur.close()
conn.close()

# Insert score
def insert_score(conn):
student_id = range(20230001, 20230041)
course_id = range(10, 15)
cur = conn.cursor()

insert_value = []
for s_id in student_id:
for c_id in course_id:
random_score = random.randint(75, 100)
insert_value.append((s_id, c_id, random_score))

insert_sql = "INSERT INTO public.score (student_id, course_id, score) VALUES (%s, %s, %s)"
cur.executemany(insert_sql, insert_value)
conn.commit()
cur.close()
conn.close()

# 修改成绩
def update_score(conn, repete_times=10):
cur = conn.cursor()

update_value = []
for i in range(repete_times):
s_id = random.randint(20230001, 20230040)
c_id = random.randint(10, 14)
det_score = random.randint(-5, 5)
update_value.append((det_score, s_id, c_id))

update_sql = "UPDATE public.score SET score = score+(%s) WHERE student_id=(%s) AND course_id=(%s) "
cur.executemany(update_sql, update_value)
conn.commit()
cur.close()
conn.close()

# 对表{table_name}指定{query_id}查询{repete_times}次
# 查询student teacher course都是调用这个函数
def select(conn, table_name, repete_times, query_id, id_range):
cur = conn.cursor()
query_value = []
for t in range(repete_times):
query_value.append((random.randint(id_range[0], id_range[1]),))
query_sql = f"SELECT * FROM public.{table_name} WHERE {query_id}=(%s)"

cur.executemany(query_sql, query_value)
conn.commit()
cur.close()
conn.close()

# 学生查询课表
def select_course(conn, repete_times):
cur = conn.cursor()
query_sql = "SELECT course_id, cname, name, point FROM public.course c, public.teacher t where c.teacher_id = t.teacher_id"

for i in range(repete_times):
cur.execute(query_sql)
result = cur.fetchall()
conn.commit()
cur.close()
conn.close()

# 学生查询成绩
def select_score(conn):
cur = conn.cursor()
query_sql = """SELECT s.student_id, s."name", c.cname, s2.score
FROM public.score s2, public.course c, public.student s
WHERE s2.course_id = c.course_id AND s2.student_id = s.student_id AND s.student_id=(%s)"""
query_value = [(i,) for i in range(20230001, 20230041)]

cur.executemany(query_sql, query_value)
conn.commit()
cur.close()
conn.close()

# 老师查询指定课程的所有成绩
def select_score_teacher(conn):
cur = conn.cursor()
query_value = [(i,) for i in range(10, 15)]
query_sql = "SELECT * FROM public.score WHERE course_id=(%s) ORDER BY score desc;"

cur.executemany(query_sql, query_value)
conn.commit()
cur.close()
conn.close()


模拟审计日志数据
https://chris-tang6.github.io/2023/07/06/模拟审计日志数据/
作者
Tang Wuguo
发布于
2023年7月6日
许可协议