Skip to content

Commit

Permalink
代码优化和注释完善。
Browse files Browse the repository at this point in the history
  • Loading branch information
majian159 committed Dec 29, 2016
1 parent 490e6ee commit bae01cf
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 44 deletions.
41 changes: 41 additions & 0 deletions src/Rabbit.Zookeeper/Delegates.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ namespace Rabbit.Zookeeper
/// </summary>
public class ConnectionStateChangeArgs
{
/// <summary>
/// 连接状态。
/// </summary>
public Watcher.Event.KeeperState State { get; set; }
}

Expand All @@ -17,6 +20,11 @@ public class ConnectionStateChangeArgs
/// </summary>
public abstract class NodeChangeArgs
{
/// <summary>
/// 创建一个新的节点变更参数。
/// </summary>
/// <param name="path">节点路径。</param>
/// <param name="type">事件类型。</param>
protected NodeChangeArgs(string path, Watcher.Event.EventType type)
{
Path = path;
Expand All @@ -34,8 +42,17 @@ protected NodeChangeArgs(string path, Watcher.Event.EventType type)
public string Path { get; private set; }
}

/// <summary>
/// 节点数据变更参数。
/// </summary>
public sealed class NodeDataChangeArgs : NodeChangeArgs
{
/// <summary>
/// 创建一个新的节点数据变更参数。
/// </summary>
/// <param name="path">节点路径。</param>
/// <param name="type">事件类型。</param>
/// <param name="currentData">最新的节点数据。</param>
public NodeDataChangeArgs(string path, Watcher.Event.EventType type, IEnumerable<byte> currentData) : base(path, type)
{
CurrentData = currentData;
Expand All @@ -47,8 +64,17 @@ public NodeDataChangeArgs(string path, Watcher.Event.EventType type, IEnumerable
public IEnumerable<byte> CurrentData { get; private set; }
}

/// <summary>
/// 节点子节点变更参数。
/// </summary>
public sealed class NodeChildrenChangeArgs : NodeChangeArgs
{
/// <summary>
/// 创建一个新的节点子节点变更参数。
/// </summary>
/// <param name="path">节点路径。</param>
/// <param name="type">事件类型。</param>
/// <param name="currentChildrens">最新的子节点集合。</param>
public NodeChildrenChangeArgs(string path, Watcher.Event.EventType type, IEnumerable<string> currentChildrens) : base(path, type)
{
CurrentChildrens = currentChildrens;
Expand All @@ -60,9 +86,24 @@ public NodeChildrenChangeArgs(string path, Watcher.Event.EventType type, IEnumer
public IEnumerable<string> CurrentChildrens { get; private set; }
}

/// <summary>
/// 节点数据变更委托。
/// </summary>
/// <param name="client">ZooKeeper客户端。</param>
/// <param name="args">节点数据变更参数。</param>
public delegate Task NodeDataChangeHandler(IZookeeperClient client, NodeDataChangeArgs args);

/// <summary>
/// 节点子节点变更委托。
/// </summary>
/// <param name="client">ZooKeeper客户端。</param>
/// <param name="args">节点子节点变更参数。</param>
public delegate Task NodeChildrenChangeHandler(IZookeeperClient client, NodeChildrenChangeArgs args);

/// <summary>
/// 连接状态变更委托。
/// </summary>
/// <param name="client">ZooKeeper客户端。</param>
/// <param name="args">连接状态变更参数。</param>
public delegate Task ConnectionStateChangeHandler(IZookeeperClient client, ConnectionStateChangeArgs args);
}
185 changes: 168 additions & 17 deletions src/Rabbit.Zookeeper/IZookeeperClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@

namespace Rabbit.Zookeeper
{
/// <summary>
/// 一个抽象的ZooKeeper客户端。
/// </summary>
public interface IZookeeperClient : IDisposable
{
/// <summary>
/// 具体的zookeeper连接
/// 具体的ZooKeeper连接
/// </summary>
ZooKeeper ZooKeeper { get; }

Expand All @@ -34,53 +37,174 @@ public interface IZookeeperClient : IDisposable
/// <returns>执行结果。</returns>
Task<T> RetryUntilConnected<T>(Func<Task<T>> callable);

/// <summary>
/// 获取指定节点的数据。
/// </summary>
/// <param name="path">节点路径。</param>
/// <returns>节点数据。</returns>
Task<IEnumerable<byte>> GetDataAsync(string path);

/// <summary>
/// 获取指定节点下的所有子节点。
/// </summary>
/// <param name="path">节点路径。</param>
/// <returns>子节点集合。</returns>
Task<IEnumerable<string>> GetChildrenAsync(string path);

/// <summary>
/// 判断节点是否存在。
/// </summary>
/// <param name="path">节点路径。</param>
/// <returns>如果存在则返回true,否则返回false。</returns>
Task<bool> ExistsAsync(string path);

Task CreateAsync(string path, byte[] data, List<ACL> acls, CreateMode createMode);
/// <summary>
/// 创建节点。
/// </summary>
/// <param name="path">节点路径。</param>
/// <param name="data">节点数据。</param>
/// <param name="acls">权限。</param>
/// <param name="createMode">创建模式。</param>
/// <returns>节点路径。</returns>
/// <remarks>
/// 因为使用序列方式创建节点zk会修改节点name,所以需要返回真正的节点路径。
/// </remarks>
Task<string> CreateAsync(string path, byte[] data, List<ACL> acls, CreateMode createMode);

/// <summary>
/// 设置节点数据。
/// </summary>
/// <param name="path">节点路径。</param>
/// <param name="data">节点数据。</param>
/// <param name="version">版本号。</param>
/// <returns>节点状态。</returns>
Task<Stat> SetDataAsync(string path, byte[] data, int version = -1);

/// <summary>
/// 删除节点。
/// </summary>
/// <param name="path">节点路径。</param>
/// <param name="version">版本号。</param>
Task DeleteAsync(string path, int version = -1);

/// <summary>
/// 订阅节点数据变更。
/// </summary>
/// <param name="path">节点路径。</param>
/// <param name="listener">监听者。</param>
Task SubscribeDataChange(string path, NodeDataChangeHandler listener);

/// <summary>
/// 取消订阅节点数据变更。
/// </summary>
/// <param name="path">节点路径。</param>
/// <param name="listener">监听者。</param>
void UnSubscribeDataChange(string path, NodeDataChangeHandler listener);

/// <summary>
/// 订阅连接状态变更。
/// </summary>
/// <param name="listener">监听者。</param>
void SubscribeStatusChange(ConnectionStateChangeHandler listener);

/// <summary>
/// 取消订阅连接状态变更。
/// </summary>
/// <param name="listener">监听者。</param>
void UnSubscribeStatusChange(ConnectionStateChangeHandler listener);

/// <summary>
/// 订阅节点子节点变更。
/// </summary>
/// <param name="path">节点路径。</param>
/// <param name="listener">监听者。</param>
Task<IEnumerable<string>> SubscribeChildrenChange(string path, NodeChildrenChangeHandler listener);

/// <summary>
/// 取消订阅节点子节点变更。
/// </summary>
/// <param name="path">节点路径。</param>
/// <param name="listener">监听者。</param>
void UnSubscribeChildrenChange(string path, NodeChildrenChangeHandler listener);
}

/// <summary>
/// ZooKeeper客户端扩展方法。
/// </summary>
public static class ZookeeperClientExtensions
{
public static Task CreateEphemeralAsync(this IZookeeperClient client, string path, byte[] data)
/// <summary>
/// 创建短暂的节点。
/// </summary>
/// <param name="client">ZooKeeper客户端。</param>
/// <param name="path">节点路径。</param>
/// <param name="data">节点数据。</param>
/// <param name="isSequential">是否按顺序创建。</param>
/// <returns>节点路径。</returns>
/// <remarks>
/// 因为使用序列方式创建节点zk会修改节点name,所以需要返回真正的节点路径。
/// </remarks>
public static Task<string> CreateEphemeralAsync(this IZookeeperClient client, string path, byte[] data, bool isSequential = false)
{
return client.CreateEphemeralAsync(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
return client.CreateEphemeralAsync(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, isSequential);
}

public static Task CreateEphemeralAsync(this IZookeeperClient client, string path, byte[] data, List<ACL> acls)
/// <summary>
/// 创建短暂的节点。
/// </summary>
/// <param name="client">ZooKeeper客户端。</param>
/// <param name="path">节点路径。</param>
/// <param name="data">节点数据。</param>
/// <param name="acls">权限。</param>
/// <param name="isSequential">是否按顺序创建。</param>
/// <returns>节点路径。</returns>
/// <remarks>
/// 因为使用序列方式创建节点zk会修改节点name,所以需要返回真正的节点路径。
/// </remarks>
public static Task<string> CreateEphemeralAsync(this IZookeeperClient client, string path, byte[] data, List<ACL> acls, bool isSequential = false)
{
return client.CreateAsync(path, data, acls, CreateMode.EPHEMERAL);
return client.CreateAsync(path, data, acls, isSequential ? CreateMode.EPHEMERAL_SEQUENTIAL : CreateMode.EPHEMERAL);
}

public static Task CreatePersistentAsync(this IZookeeperClient client, string path, byte[] data)
/// <summary>
/// 创建节点。
/// </summary>
/// <param name="client">ZooKeeper客户端。</param>
/// <param name="path">节点路径。</param>
/// <param name="data">节点数据。</param>
/// <param name="isSequential">是否按顺序创建。</param>
/// <returns>节点路径。</returns>
/// <remarks>
/// 因为使用序列方式创建节点zk会修改节点name,所以需要返回真正的节点路径。
/// </remarks>
public static Task<string> CreatePersistentAsync(this IZookeeperClient client, string path, byte[] data, bool isSequential = false)
{
return client.CreatePersistentAsync(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
return client.CreatePersistentAsync(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, isSequential);
}

public static Task CreatePersistentAsync(this IZookeeperClient client, string path, byte[] data, List<ACL> acls)
/// <summary>
/// 创建节点。
/// </summary>
/// <param name="client">ZooKeeper客户端。</param>
/// <param name="path">节点路径。</param>
/// <param name="data">节点数据。</param>
/// <param name="acls">权限。</param>
/// <param name="isSequential">是否按顺序创建。</param>
/// <returns>节点路径。</returns>
/// <remarks>
/// 因为使用序列方式创建节点zk会修改节点name,所以需要返回真正的节点路径。
/// </remarks>
public static Task<string> CreatePersistentAsync(this IZookeeperClient client, string path, byte[] data, List<ACL> acls, bool isSequential = false)
{
return client.CreateAsync(path, data, acls, CreateMode.PERSISTENT);
return client.CreateAsync(path, data, acls, isSequential ? CreateMode.PERSISTENT_SEQUENTIAL : CreateMode.PERSISTENT);
}

/// <summary>
/// 递归删除该节点下的所有子节点和该节点本身。
/// </summary>
/// <param name="client">ZooKeeper客户端。</param>
/// <param name="path">节点路径。</param>
/// <returns>如果成功则返回true,false。</returns>
public static async Task<bool> DeleteRecursiveAsync(this IZookeeperClient client, string path)
{
IEnumerable<string> children;
Expand All @@ -93,7 +217,7 @@ public static async Task<bool> DeleteRecursiveAsync(this IZookeeperClient client
return true;
}

foreach (string subPath in children)
foreach (var subPath in children)
{
if (!await client.DeleteRecursiveAsync(path + "/" + subPath))
{
Expand All @@ -104,25 +228,52 @@ public static async Task<bool> DeleteRecursiveAsync(this IZookeeperClient client
return true;
}

public static Task CreateRecursiveAsync(this IZookeeperClient client, string path, byte[] data, CreateMode createMode)
/// <summary>
/// 递归创建该节点下的所有子节点和该节点本身。
/// </summary>
/// <param name="client">ZooKeeper客户端。</param>
/// <param name="path">节点路径。</param>
/// <param name="data">节点数据。</param>
public static Task CreateRecursiveAsync(this IZookeeperClient client, string path, byte[] data)
{
return client.CreateRecursiveAsync(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
}

/// <summary>
/// 递归创建该节点下的所有子节点和该节点本身。
/// </summary>
/// <param name="client">ZooKeeper客户端。</param>
/// <param name="path">节点路径。</param>
/// <param name="data">节点数据。</param>
/// <param name="acls">权限。</param>
public static Task CreateRecursiveAsync(this IZookeeperClient client, string path, byte[] data, List<ACL> acls)
{
return client.CreateRecursiveAsync(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
return client.CreateRecursiveAsync(path, p => data, p => acls);
}

public static async Task CreateRecursiveAsync(this IZookeeperClient client, string path, byte[] data, List<ACL> acls, CreateMode createMode)
/// <summary>
/// 递归创建该节点下的所有子节点和该节点本身。
/// </summary>
/// <param name="client">ZooKeeper客户端。</param>
/// <param name="path">节点路径。</param>
/// <param name="getNodeData">获取当前被创建节点数据的委托。</param>
/// <param name="getNodeAcls">获取当前被创建节点权限的委托。</param>
public static async Task CreateRecursiveAsync(this IZookeeperClient client, string path, Func<string, byte[]> getNodeData, Func<string, List<ACL>> getNodeAcls)
{
var data = getNodeData(path);
var acls = getNodeAcls(path);
try
{
await client.CreateAsync(path, data, acls, createMode);
await client.CreateAsync(path, data, acls, CreateMode.PERSISTENT);
}
catch (KeeperException.NodeExistsException)
{
}
catch (KeeperException.NoNodeException)
{
var parentDir = path.Substring(0, path.LastIndexOf('/'));
await CreateRecursiveAsync(client, parentDir, null, acls, CreateMode.PERSISTENT);
await client.CreateAsync(path, data, acls, createMode);
await CreateRecursiveAsync(client, parentDir, getNodeData, getNodeAcls);
await client.CreateAsync(path, data, acls, CreateMode.PERSISTENT);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/Rabbit.Zookeeper/Implementation/NodeEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ public async Task<bool> ExistsAsync(bool watch = false)
return data != null;
}

public async Task CreateAsync(byte[] data, List<ACL> acls, CreateMode createMode)
public async Task<string> CreateAsync(byte[] data, List<ACL> acls, CreateMode createMode)
{
var zooKeeper = _client.ZooKeeper;
await zooKeeper.createAsync(Path, data, acls, createMode);
return await zooKeeper.createAsync(Path, data, acls, createMode);
}

public Task<Stat> SetDataAsync(byte[] data, int version = -1)
Expand Down
Loading

0 comments on commit bae01cf

Please sign in to comment.