-
Notifications
You must be signed in to change notification settings - Fork 2
/
StreamBag.cs
152 lines (122 loc) · 4.34 KB
/
StreamBag.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using RobSharper.Ros.BagReader.Enumerators;
using RobSharper.Ros.BagReader.Records;
namespace RobSharper.Ros.BagReader
{
public class StreamBag : IBag
{
public BagHeader Header { get; }
public IEnumerable<Connection> Connections { get; }
public IEnumerable<BagMessage> Messages { get; }
public StreamBag(Stream bag)
{
if (bag == null) throw new ArgumentNullException(nameof(bag));
if (!bag.CanSeek)
throw new InvalidOperationException("Cannot seek on stream");
var mutex = new Mutex();
Header = ReadHeader(bag, mutex);
Messages = new MessageCollection(bag, mutex);
Connections = new ConnectionCollection(bag, mutex);
}
private static BagHeader ReadHeader(Stream bag, Mutex mutex)
{
var initialPosition = bag.Position;
try
{
mutex.WaitOne();
var headerVisitor = new HeaderVisitor();
var rosbag = BagReaderFactory.Create(bag, headerVisitor);
do
{
rosbag.ProcessNext();
if (headerVisitor.Header != null)
break;
} while (rosbag.HasNext());
return headerVisitor.Header;
}
finally
{
bag.Seek(initialPosition, SeekOrigin.Begin);
mutex.ReleaseMutex();
}
}
private abstract class Collection<T> : IEnumerable<T>
{
private readonly Stream _stream;
private readonly Mutex _mutex;
internal Collection(Stream stream, Mutex mutex)
{
_stream = stream;
_mutex = mutex;
}
public IEnumerator<T> GetEnumerator()
{
var enumerator = GetEnumerator(_stream);
return new StreamResettingMutexEnumerator<T>(enumerator, _stream, _mutex);
}
protected abstract IEnumerator<T> GetEnumerator(Stream stream);
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
private class MessageCollection : Collection<BagMessage>
{
internal MessageCollection(Stream stream, Mutex mutex) : base(stream, mutex)
{
}
protected override IEnumerator<BagMessage> GetEnumerator(Stream stream)
{
return new BagMessageEnumerator(stream);
}
}
private class ConnectionCollection : Collection<Connection>
{
internal ConnectionCollection(Stream stream, Mutex mutex) : base(stream, mutex)
{
}
protected override IEnumerator<Connection> GetEnumerator(Stream stream)
{
return new BagConnectionEnumerator(stream);
}
}
private class StreamResettingMutexEnumerator<T> : IEnumerator<T>
{
private readonly IEnumerator<T> _inner;
private readonly Stream _stream;
private readonly Mutex _mutex;
private bool _disposed;
private long _initialPosition;
public StreamResettingMutexEnumerator(IEnumerator<T> enumerator, Stream stream, Mutex mutex)
{
_inner = enumerator;
_stream = stream;
_mutex = mutex;
_mutex.WaitOne();
_initialPosition = _stream.Position;
}
public bool MoveNext()
{
return _inner.MoveNext();
}
public void Reset()
{
_inner.Reset();
}
public T Current => _inner.Current;
object IEnumerator.Current => Current;
public void Dispose()
{
if (_disposed)
return;
_stream.Seek(_initialPosition, SeekOrigin.Begin);
_mutex.ReleaseMutex();
_disposed = true;
}
}
}
}