forked from shiyindebcd/ProcessTrader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
read_write_file.py
125 lines (86 loc) · 4.21 KB
/
read_write_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# # coding:utf-8
import sys
import os
import time
import datetime
import pandas as pd
class ReadWriteCsv(object): # csv文件读写类
def add_dict_to_csv(self, my_df, path):
data = self.read_csv_file(path)
data = pd.concat([data, my_df], ignore_index=True)
self.write_datas_to_csv_file(data, path)
print('数据已存入config文件')
def read_csv_file(self, path, converters=None): # 加了converters,读取时强制按指定类型读取
df = pd.read_csv(path, engine='python', index_col=0, converters=converters)
return df
def write_datas_to_csv_file(self, df_tmp, path, encoding='utf_8_sig'):
df = pd.DataFrame(df_tmp)
df.to_csv(path, encoding=encoding)
def judge_dirs_exist(self, dirs): # 判断目录是否存在
if not os.path.exists(dirs):
os.makedirs(dirs)
print('文件目录' + dirs + '不存在,已创建')
else:
pass
def judge_file_exist(self, path): # 判断文件是否存在
if not os.path.exists(path):
df_tmp = pd.DataFrame()
self.write_datas_to_csv_file(df_tmp, path)
print('文件' + path + '不存在,已创建空白文件')
else:
pass
def delete_file(self, path):
os.remove(path)
class ReadWriteExcel(object): # excel文件读写类
def add_dict_to_excel(self, my_df, path):
data = self.read_excel_file(path)
data = pd.concat([data, my_df], ignore_index=True)
self.write_config_file(data, path)
print('数据已存入config文件')
def read_excel_file(self, path):
df = pd.read_excel(path,index_col=0)
return df
def write_config_file(self, df_tmp, path, encoding='utf-8',index=False):
df = pd.DataFrame(df_tmp)
df.to_excel(path)
def judge_config_exist(self, path):
dirs = './data/'
if not os.path.exists(dirs):
os.makedirs(dirs)
print('文件目录' + dirs + '不存在,已创建')
else:
pass
if not os.path.exists(path):
df_tmp = pd.DataFrame()
self.write_config_file(df_tmp, path)
print('文件' + path + '不存在,已创建空白文件')
else:
print('文件' + path + '存在')
class Logger(object): # 日志记录类
def __init__(self, process_name):
self.path = "./log/" + process_name + '/' # 在log目录下创建以 process_name 命名的文件夹
self.log_filename = datetime.datetime.now().strftime('%Y_%m_%d') + '.log' # 日志文件名,以当前日期为名
self.terminal = sys.stdout
# 检查self.path是否存在,不存在则创建
if not os.path.exists(self.path):
os.makedirs(self.path)
# 检查self.path下是否存在self.log_filename文件,不存在则创建空白文件
if not os.path.exists(self.path + self.log_filename):
with open(self.path + self.log_filename, 'w') as f:
f.write('')
print('空白日志文件已创建,路径为:' + self.path + self.log_filename)
# 打开self.path下的self.log_filename文件,并写入日志
self.log = open(os.path.join(self.path, self.log_filename), "a", buffering = 1, encoding='utf8',)
# open(file, ‘w’, buffering = a) 中,buffering设置缓冲行为
# 全缓冲: a 是正整数,当缓冲区文件大小达到a大小时候,写入磁盘
# 行缓冲: buffering = 1, 缓冲区碰到 \n 换行符的时候就写入磁盘
# 无缓冲:buffering = 0 ,写多少,存多少
print('*' * 40,' ', self.log_filename, ' ', '*' * 40)
print('\n')
print('开始记录 ', process_name, ' 的运行日志 。。。。。。\n')
print('开始时间为: ', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()), '\n\n')
def write(self, message):
self.terminal.write(message)
self.log.write(message)
def flush(self):
pass