Skip to content

Commit

Permalink
支持短暂类型节点的恢复。
Browse files Browse the repository at this point in the history
  • Loading branch information
majian159 committed Dec 29, 2016
1 parent bae01cf commit 6574755
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 25 deletions.
175 changes: 150 additions & 25 deletions src/Rabbit.Zookeeper/Implementation/NodeEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ internal class NodeEntry
/// </summary>
private NodeChildrenChangeHandler _childrenChangeHandler;

/// <summary>
/// 节点的快照。
/// </summary>
private NodeSnapshot _localSnapshot = default(NodeSnapshot);

#endregion Field

#region Property
Expand All @@ -48,6 +53,8 @@ public async Task<IEnumerable<byte>> GetDataAsync(bool watch = false)
var zookeeper = _client.ZooKeeper;
var data = await zookeeper.getDataAsync(Path, watch);

_localSnapshot.SetData(data?.Data);

return data?.Data;
}

Expand All @@ -56,6 +63,8 @@ public async Task<IEnumerable<string>> GetChildrenAsync(bool watch = false)
var zookeeper = _client.ZooKeeper;
var data = await zookeeper.getChildrenAsync(Path, watch);

_localSnapshot.SetChildrens(data?.Children);

return data?.Children;
}

Expand All @@ -64,25 +73,39 @@ public async Task<bool> ExistsAsync(bool watch = false)
var zookeeper = _client.ZooKeeper;
var data = await zookeeper.existsAsync(Path, watch);

return data != null;
var exists = data != null;

_localSnapshot.SetExists(exists);

return exists;
}

public async Task<string> CreateAsync(byte[] data, List<ACL> acls, CreateMode createMode)
{
var zooKeeper = _client.ZooKeeper;
return await zooKeeper.createAsync(Path, data, acls, createMode);
var path = await zooKeeper.createAsync(Path, data, acls, createMode);

_localSnapshot.Create(createMode, data, acls);

return path;
}

public Task<Stat> SetDataAsync(byte[] data, int version = -1)
{
var zooKeeper = _client.ZooKeeper;
return zooKeeper.setDataAsync(Path, data, version);
var stat = zooKeeper.setDataAsync(Path, data, version);

_localSnapshot.Update(data, version);

return stat;
}

public Task DeleteAsync(int version = -1)
public async Task DeleteAsync(int version = -1)
{
var zookeeper = _client.ZooKeeper;
return zookeeper.deleteAsync(Path, version);
await zookeeper.deleteAsync(Path, version);

_localSnapshot.Delete();
}

#region Listener
Expand Down Expand Up @@ -175,6 +198,26 @@ internal async Task OnChange(WatchedEvent watchedEvent, bool isFirstConnection)
/// </summary>
private bool HasChildrenChangeHandler => HasHandler(_childrenChangeHandler);

/// <summary>
/// 状态变更处理。
/// </summary>
/// <param name="watchedEvent"></param>
/// <param name="isFirstConnection">是否是zk第一次连接上服务器。</param>
private async Task OnStatusChangeHandle(WatchedEvent watchedEvent, bool isFirstConnection)
{
//第一次连接zk不进行通知
if (isFirstConnection)
return;

//尝试恢复节点
await RestoreEphemeral();

if (HasDataChangeHandler)
await OnDataChangeHandle(watchedEvent);
if (HasChildrenChangeHandler)
await OnChildrenChangeHandle(watchedEvent);
}

private async Task OnDataChangeHandle(WatchedEvent watchedEvent)
{
if (!HasDataChangeHandler)
Expand Down Expand Up @@ -220,23 +263,6 @@ private async Task OnDataChangeHandle(WatchedEvent watchedEvent)
await WatchDataChange();
}

/// <summary>
/// 状态变更处理。
/// </summary>
/// <param name="watchedEvent"></param>
/// <param name="isFirstConnection">是否是zk第一次连接上服务器。</param>
private async Task OnStatusChangeHandle(WatchedEvent watchedEvent, bool isFirstConnection)
{
//第一次连接zk不进行通知
if (isFirstConnection)
return;

if (HasDataChangeHandler)
await OnDataChangeHandle(watchedEvent);
if (HasChildrenChangeHandler)
await OnChildrenChangeHandle(watchedEvent);
}

private async Task OnChildrenChangeHandle(WatchedEvent watchedEvent)
{
if (!HasChildrenChangeHandler)
Expand All @@ -261,16 +287,18 @@ private async Task OnChildrenChangeHandle(WatchedEvent watchedEvent)
switch (watchedEvent.get_Type())
{
case Watcher.Event.EventType.NodeCreated:
args = new NodeChildrenChangeArgs(Path, Watcher.Event.EventType.NodeCreated, await getCurrentChildrens());
args = new NodeChildrenChangeArgs(Path, Watcher.Event.EventType.NodeCreated,
await getCurrentChildrens());
break;

case Watcher.Event.EventType.NodeDeleted:
args = new NodeChildrenChangeArgs(Path, Watcher.Event.EventType.NodeDeleted, null);
break;

case Watcher.Event.EventType.NodeChildrenChanged:
case Watcher.Event.EventType.None://重连时触发
args = new NodeChildrenChangeArgs(Path, Watcher.Event.EventType.NodeChildrenChanged, await getCurrentChildrens());
case Watcher.Event.EventType.None: //重连时触发
args = new NodeChildrenChangeArgs(Path, Watcher.Event.EventType.NodeChildrenChanged,
await getCurrentChildrens());
break;

default:
Expand Down Expand Up @@ -309,6 +337,103 @@ private static bool HasHandler(MulticastDelegate multicast)
return multicast != null && multicast.GetInvocationList().Any();
}

private async Task RestoreEphemeral()
{
//没有开启恢复
if (!_client.Options.EnableEphemeralNodeRestore)
return;

//节点不存在
if (!_localSnapshot.IsExist)
return;

//不是短暂的节点
if (_localSnapshot.Mode != CreateMode.EPHEMERAL && _localSnapshot.Mode != CreateMode.EPHEMERAL_SEQUENTIAL)
return;

try
{
await _client.RetryUntilConnected(async () =>
{
try
{
return await CreateAsync(_localSnapshot.Data?.ToArray(), _localSnapshot.Acls, _localSnapshot.Mode);
}
catch (KeeperException.NodeExistsException) //节点已经存在则忽略
{
return Path;
}
});
}
catch (Exception exception)
{
Console.WriteLine($"恢复节点失败,异常:{exception.Message}");
}
}

#endregion Private Method

#region Help Type

public struct NodeSnapshot
{
public bool IsExist { get; set; }
public CreateMode Mode { get; set; }
public IEnumerable<byte> Data { get; set; }
public int? Version { get; set; }
public List<ACL> Acls { get; set; }
public IEnumerable<string> Childrens { get; set; }

public void Create(CreateMode mode, byte[] data, List<ACL> acls)
{
IsExist = true;
Mode = mode;
Data = data;
Version = -1;
Acls = acls;
Childrens = null;
}

public void Update(IEnumerable<byte> data, int version)
{
IsExist = true;
Data = data;
Version = version;
}

public void Delete()
{
IsExist = false;
Mode = null;
Data = null;
Version = null;
Acls = null;
Childrens = null;
}

public void SetData(IEnumerable<byte> data)
{
IsExist = true;
Data = data;
}

public void SetChildrens(IEnumerable<string> childrens)
{
IsExist = true;
Childrens = childrens;
}

public void SetExists(bool exists)
{
if (!exists)
{
Delete();
return;
}
IsExist = true;
}
}

#endregion Help Type
}
}
8 changes: 8 additions & 0 deletions src/Rabbit.Zookeeper/ZookeeperClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class ZookeeperClientOptions
/// <see cref="SessionId"/> 为0。
/// <see cref="SessionPasswd"/> 为null。
/// <see cref="BasePath"/> 为null。
/// <see cref="EnableEphemeralNodeRestore"/> 为true。
/// </remarks>
public ZookeeperClientOptions()
{
Expand All @@ -27,6 +28,7 @@ public ZookeeperClientOptions()
ReadOnly = false;
SessionId = 0;
SessionPasswd = null;
EnableEphemeralNodeRestore = true;
}

/// <summary>
Expand All @@ -42,6 +44,7 @@ public ZookeeperClientOptions()
/// <see cref="SessionId"/> 为0。
/// <see cref="SessionPasswd"/> 为null。
/// <see cref="BasePath"/> 为null。
/// <see cref="EnableEphemeralNodeRestore"/> 为true。
/// </remarks>
public ZookeeperClientOptions(string connectionString) : this()
{
Expand Down Expand Up @@ -90,5 +93,10 @@ public ZookeeperClientOptions(string connectionString) : this()
/// 基础路径,会在所有的zk操作节点路径上加入此基础路径。
/// </summary>
public string BasePath { get; set; }

/// <summary>
/// 是否启用短暂类型节点的恢复。
/// </summary>
public bool EnableEphemeralNodeRestore { get; set; }
}
}

0 comments on commit 6574755

Please sign in to comment.