-
Notifications
You must be signed in to change notification settings - Fork 13
/
baidufuse2.py
450 lines (380 loc) · 14.8 KB
/
baidufuse2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
#!/usr/bin/python
# -*- coding: utf-8 -*-
import stat
import errno
import os
import sys
import math
from threading import Lock, Thread
try:
import _find_fuse_parts
except ImportError:
pass
import json
import time
from fuse import FUSE, FuseOSError, Operations
from baidupcsapi import PCS
import logging
import tempfile
import progressbar
logger = logging.getLogger('BaiduFS')
formatter = logging.Formatter(
'%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',)
stream_handler = logging.StreamHandler(sys.stderr)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
logger.setLevel(logging.NOTSET)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S')
class NoSuchRowException(Exception):
pass
class NoUniqueValueException(Exception):
pass
class ProgressBar():
def __init__(self):
self.first_call = True
def __call__(self, *args, **kwargs):
if self.first_call:
self.widgets = [progressbar.Percentage(), ' ', progressbar.Bar(marker=progressbar.RotatingMarker('>')),
' ', progressbar.FileTransferSpeed()]
self.pbar = progressbar.ProgressBar(widgets=self.widgets, maxval=kwargs['size']).start()
self.first_call = False
if kwargs['size'] <= kwargs['progress']:
self.pbar.finish()
else:
self.pbar.update(kwargs['progress'])
class File():
def __init__(self):
self.dict = {'bd_fsid':0,
'bd_blocklist':0,
'bd_md5':0,
'st_mode':0,
'st_ino':0,
'st_dev':0,
'st_nlink':0,
'st_uid':0,
'st_gid':0,
'st_size':0,
'st_atime':0,
'st_mtime':0,
'st_ctime':0}
def __getitem__(self, item):
return self.dict[item]
def __setitem__(self, key, value):
self.dict[key] = value
def __str__(self):
return self.dict.__repr__()
def __repr__(self):
return self.dict.__repr__()
def getDict(self):
return self.dict
class BaiduFS(Operations):
'''Baidu netdisk filesystem'''
def __init__(self, username, password, *args, **kw):
self.disk = PCS(username, password)
self.buffer = {}
self.traversed_folder = {}
self.bufferLock = Lock()
self.upload_blocks = {} # 文件上传时用于记录块的md5,{PATH:{TMP:'',BLOCKS:''}
self.create_tmp = {} # {goutputstrem_path:file}
self.upload_fails = {} #
self.fd = 3
# 初始化百度服务器
print '设置pcs服务器'
pcs = self.disk.get_fastest_pcs_server()
self.disk.set_pcs_server(pcs)
print 'pcs api server:',pcs
'''
print '设置百度网盘服务器,时间比较长,请耐心等待'
pan = self.disk.get_fastest_mirror()
self.disk.set_pan_server(pan)
print 'baidupan server',pan
'''
self.uploadLock = Lock() # 上传文件时不刷新目录
self.readLock = Lock()
self.downloading_files = {}
def unlink(self, path):
print '*'*10,'UNLINK CALLED',path
self.disk.delete([path])
def _add_file_to_buffer(self, path,file_info):
foo = File()
foo['st_ctime'] = file_info['local_ctime']
foo['st_mtime'] = file_info['local_mtime']
foo['st_mode'] = (stat.S_IFDIR | 0777) if file_info['isdir'] \
else (stat.S_IFREG | 0777)
foo['st_nlink'] = 2 if file_info['isdir'] else 1
foo['st_size'] = file_info['size']
self.buffer[path] = foo
def _del_file_from_buffer(self,path):
self.buffer.pop(path)
def getattr(self, path, fh=None):
#print 'getattr *',path
# 先看缓存中是否存在该文件
if not self.buffer.has_key(path):
print path,'未命中'
#print self.buffer
#print self.traversed_folder
jdata = json.loads(self.disk.meta([path]).content)
try:
if 'info' not in jdata:
raise FuseOSError(errno.ENOENT)
if jdata['errno'] != 0:
raise FuseOSError(errno.ENOENT)
file_info = jdata['info'][0]
self._add_file_to_buffer(path,file_info)
st = self.buffer[path].getDict()
return st
except:
raise FuseOSError(errno.ENOENT)
else:
#print path,'命中'
return self.buffer[path].getDict()
def readdir(self, path, offset):
while True:
try:
logger.debug(u'读取目录' + path)
foo = json.loads(self.disk.list_files(path).text)
break
except Exception as s:
logger.error('error',str(s))
files = ['.', '..']
abs_files = [] # 该文件夹下文件的绝对路径
for file in foo['list']:
files.append(file['server_filename'])
abs_files.append(file['path'])
# 缓存文件夹下文件信息,批量查询meta info
# Update:解决meta接口一次不能查询超过100条记录
# 分成 ceil(file_num / 100.0) 组,利用商群
if not self.traversed_folder.has_key(path) or self.traversed_folder[path] == False:
logger.debug(u'正在对'+path+u'缓存中')
file_num = len(abs_files)
group = int(math.ceil(file_num / 100.0))
for i in range(group):
obj = [f for n,f in enumerate(abs_files) if n % group == i] #一组数据
while 1:
try:
ret = json.loads(self.disk.meta(obj).text)
break
except:
print 'error'
for file_info in ret['info']:
if not self.buffer.has_key(file_info['path']):
self._add_file_to_buffer(file_info['path'],file_info)
#print self.buffer
print '对',path,'的缓存完成'
self.traversed_folder[path] = True
for r in files:
yield r
def _update_file_manual(self,path):
while 1:
try:
jdata = json.loads(self.disk.meta([path]).content)
break
except:
print 'error'
if 'info' not in jdata:
raise FuseOSError(errno.ENOENT)
if jdata['errno'] != 0:
raise FuseOSError(errno.ENOENT)
file_info = jdata['info'][0]
self._add_file_to_buffer(path,file_info)
def rename(self, old, new):
#logging.debug('* rename',old,os.path.basename(new))
print '*'*10,'RENAME CALLED',old,os.path.basename(new),type(old),type(new)
while True:
try:
ret = self.disk.rename([(old,os.path.basename(new))]).content
jdata = json.loads(ret)
break
except:
print 'error'
if jdata['errno'] != 0:
# 文件名已存在,删除原文件
print self.disk.delete([new]).content
print self.disk.rename([(old,os.path.basename(new))])
self._update_file_manual(new)
self.buffer.pop(old)
def open(self, path, flags):
self.readLock.acquire()
print '*'*10,'OPEN CALLED',path,flags
#print '[****]',path
"""
Permission denied
accmode = os.O_RDONLY | os.O_WRONLY | os.O_RDWR
if (flags & accmode) != os.O_RDONLY:
raise FuseOSError(errno.EACCES)
"""
self.fd += 1
self.readLock.release()
return self.fd
def create(self, path, mode,fh=None):
# 创建文件
# 中文路径有问题
print '*'*10,'CREATE CALLED',path,mode,type(path)
#if 'outputstream' not in path:
tmp_file = tempfile.TemporaryFile('r+w+b')
foo = self.disk.upload(os.path.dirname(path),tmp_file,os.path.basename(path)).content
ret = json.loads(foo)
print ret
print 'create-not-outputstream',ret
if ret['path'] != path:
# 文件已存在
print '文件已存在'
raise FuseOSError(errno.EEXIST)
'''
else:
print 'create:',path
foo = File()
foo['st_ctime'] = int(time.time())
foo['st_mtime'] = int(time.time())
foo['st_mode'] = (stat.S_IFREG | 0777)
foo['st_nlink'] = 1
foo['st_size'] = 0
self.buffer[path] = foo
'''
'''
dict(st_mode=(stat.S_IFREG | mode), st_nlink=1,
st_size=0, st_ctime=time.time(), st_mtime=time.time(),
st_atime=time.time())
'''
self.fd += 1
return 0
def write(self, path, data, offset, fp):
# 上传文件时会调用
# 4kb ( 4096 bytes ) 每块,data中是块中的数据
# 最后一块的判断:len(data) < 4096
# 文件大小 = 最后一块的offset + len(data)
# 4kb传太慢了,合计成2M传一次
#print '*'*10,path,offset, len(data)
def _block_size(stream):
stream.seek(0,2)
return stream.tell()
_BLOCK_SIZE = 16 * 2 ** 20
# 第一块的任务
if offset == 0:
#self.uploadLock.acquire()
#self.readLock.acquire()
# 初始化块md5列表
self.upload_blocks[path] = {'tmp':None,
'blocks':[]}
# 创建缓冲区临时文件
tmp_file = tempfile.TemporaryFile('r+w+b')
self.upload_blocks[path]['tmp'] = tmp_file
# 向临时文件写入数据,检查是否>= _BLOCK_SIZE 是则上传该块并将临时文件清空
try:
tmp = self.upload_blocks[path]['tmp']
except KeyError:
return 0
tmp.write(data)
if _block_size(tmp) > _BLOCK_SIZE:
print path,'发生上传'
tmp.seek(0)
try:
foo = self.disk.upload_tmpfile(tmp,callback=ProgressBar()).content
foofoo = json.loads(foo)
block_md5 = foofoo['md5']
except:
print foo
# 在 upload_blocks 中插入本块的 md5
self.upload_blocks[path]['blocks'].append(block_md5)
# 创建缓冲区临时文件
self.upload_blocks[path]['tmp'].close()
tmp_file = tempfile.TemporaryFile('r+w+b')
self.upload_blocks[path]['tmp'] = tmp_file
print '创建临时文件',tmp_file.name
# 最后一块的任务
if len(data) < 4096:
# 检查是否有重名,有重名则删除它
while True:
try:
foo = self.disk.meta([path]).content
foofoo = json.loads(foo)
break
except:
print 'error'
if foofoo['errno'] == 0:
logger.debug('Deleted the file which has same name.')
self.disk.delete([path])
# 看看是否需要上传
if _block_size(tmp) != 0:
# 此时临时文件有数据,需要上传
print path,'发生上传,块末尾,文件大小',_block_size(tmp)
tmp.seek(0)
while True:
try:
foo = self.disk.upload_tmpfile(tmp,callback=ProgressBar()).content
foofoo = json.loads(foo)
break
except:
print 'exception, retry.'
block_md5 = foofoo['md5']
# 在 upload_blocks 中插入本块的 md5
self.upload_blocks[path]['blocks'].append(block_md5)
# 调用 upload_superfile 以合并块文件
print '合并文件',path,type(path)
self.disk.upload_superfile(path,self.upload_blocks[path]['blocks'])
# 删除upload_blocks中数据
self.upload_blocks.pop(path)
# 更新本地文件列表缓存
self._update_file_manual(path)
#self.readLock.release()
#self.uploadLock.release()
return len(data)
def mkdir(self, path, mode):
logger.debug("mkdir is:" + path)
self.disk.mkdir(path)
def rmdir(self, path):
logger.debug("rmdir is:" + path)
self.disk.delete([path])
def read(self, path, size, offset, fh):
print '*'*10,'READ CALLED',path,size,offset
#logger.debug("read is: " + path)
# 改为由第三方工具下载并每次判断下载的临时文件大小
if offset == 0:
tmp = tempfile.mktemp()
url = self.disk.download_url([path])[0]
logger.debug('%s started downloader' % url)
"""
thread = threading.Thread(target=self.downlaoder, args=(url, tmp))
thread.start()
while thread.isAlive():
pass
"""
number = 5
cookies = ';'.join(['%s=%s' % (k, v)
for k, v in self.disk.session.cookies.items()])
cmd = 'axel --alternate -n{0} -H "Cookies:{1}" "{2}" -o "{3}"'.format(number, cookies, url, tmp)
logger.debug('now start axel on %s' % path)
os.system(cmd)
logger.debug('axel on %s done.' % path)
# self.downloader(url, tmp)
logger.debug('%s downloaded' % url)
self.downloading_files[path] = (tmp, open(tmp,'rb'))
file_handler = self.downloading_files[path][1]
return file_handler.read(size)
"""
paras = {'Range': 'bytes=%s-%s' % (offset, offset + size - 1)}
while True:
try:
foo = self.disk.download(path, headers=paras).content
return foo
except:
pass
"""
def downloader(self, url, path):
number = 5
cookies = ';'.join(['%s=%s' % (k, v)
for k, v in self.disk.session.cookies.items()])
cmd = 'axel --alternate -n{0} -H "Cookies:{1}" {2} -o {3}'.format(number, cookies, url, path)
logger.debug('now start axel on %s' % path)
os.system(cmd)
logger.debug('axel on %s done.' % path)
return
access = None
statfs = None
if __name__ == '__main__':
if len(sys.argv) != 4:
print 'Usage {0} username password mountpoint'.format(sys.argv[0])
sys.exit(0)
FUSE(BaiduFS(sys.argv[1],sys.argv[2]),sys.argv[3],foreground=True, nonempty=True)