Skip to content
This repository has been archived by the owner on Jul 18, 2023. It is now read-only.

Commit

Permalink
Add RetentionPolicy support
Browse files Browse the repository at this point in the history
The retention policy parameter has been added to allow writing to non-default retention policies.
  • Loading branch information
StefanoRaggi committed Apr 14, 2020
1 parent 20f3735 commit fe0c50d
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ namespace InfluxDB.Collector.Configuration
{
public abstract class CollectorEmitConfiguration
{
public abstract CollectorConfiguration InfluxDB(Uri serverBaseAddress, string database, string username = null, string password = null);
public abstract CollectorConfiguration InfluxDB(Uri serverBaseAddress, string database, string username = null, string password = null, string retentionPolicy = null);

public CollectorConfiguration InfluxDB(string serverBaseAddress, string database, string username = null, string password = null)
public CollectorConfiguration InfluxDB(string serverBaseAddress, string database, string username = null, string password = null, string retentionPolicy = null)
{
return InfluxDB(new Uri(serverBaseAddress), database, username, password);
return InfluxDB(new Uri(serverBaseAddress), database, username, password, retentionPolicy);
}

public abstract CollectorConfiguration Emitter(Action<PointData[]> emitter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ public PipelinedCollectorEmitConfiguration(
_configuration = configuration;
}

public override CollectorConfiguration InfluxDB(Uri serverBaseAddress, string database, string username = null, string password = null)
public override CollectorConfiguration InfluxDB(Uri serverBaseAddress, string database, string username = null, string password = null, string retentionPolicy = null)
{
if (string.Compare(serverBaseAddress.Scheme, "udp", ignoreCase: true) == 0)
_client = new LineProtocolUdpClient(serverBaseAddress, database, username, password);
_client = new LineProtocolUdpClient(serverBaseAddress, database, username, password, retentionPolicy);
else
_client = new LineProtocolClient(serverBaseAddress, database, username, password);
_client = new LineProtocolClient(serverBaseAddress, database, username, password, retentionPolicy);
return _configuration;
}

Expand Down
11 changes: 7 additions & 4 deletions src/InfluxDB.LineProtocol/Client/LineProtocolClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ public class LineProtocolClient : LineProtocolClientBase
{
private readonly HttpClient _httpClient;

public LineProtocolClient(Uri serverBaseAddress, string database, string username = null, string password = null)
: this(new HttpClientHandler(), serverBaseAddress, database, username, password)
public LineProtocolClient(Uri serverBaseAddress, string database, string username = null, string password = null, string retentionPolicy = null)
: this(new HttpClientHandler(), serverBaseAddress, database, username, password, retentionPolicy)
{
}

Expand All @@ -24,8 +24,9 @@ protected LineProtocolClient(
Uri serverBaseAddress,
string database,
string username,
string password)
:base(serverBaseAddress, database, username, password)
string password,
string retentionPolicy)
:base(serverBaseAddress, database, username, password, retentionPolicy)
{
if (serverBaseAddress == null)
throw new ArgumentNullException(nameof(serverBaseAddress));
Expand All @@ -42,6 +43,8 @@ protected override async Task<LineProtocolWriteResult> OnSendAsync(
CancellationToken cancellationToken = default(CancellationToken))
{
var endpoint = $"write?db={Uri.EscapeDataString(_database)}";
if (!string.IsNullOrWhiteSpace(_retentionPolicy))
endpoint += $"&rp={Uri.EscapeDataString(_retentionPolicy)}";
if (!string.IsNullOrEmpty(_username))
endpoint += $"&u={Uri.EscapeDataString(_username)}&p={Uri.EscapeDataString(_password)}";

Expand Down
5 changes: 3 additions & 2 deletions src/InfluxDB.LineProtocol/Client/LineProtocolClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ namespace InfluxDB.LineProtocol.Client
{
public abstract class LineProtocolClientBase : ILineProtocolClient
{
protected readonly string _database, _username, _password;
protected readonly string _database, _username, _password, _retentionPolicy;

protected LineProtocolClientBase(Uri serverBaseAddress, string database, string username, string password)
protected LineProtocolClientBase(Uri serverBaseAddress, string database, string username, string password, string retentionPolicy)
{
if (serverBaseAddress == null)
throw new ArgumentNullException(nameof(serverBaseAddress));
Expand All @@ -25,6 +25,7 @@ protected LineProtocolClientBase(Uri serverBaseAddress, string database, string
_database = database;
_username = username;
_password = password;
_retentionPolicy = retentionPolicy;
}

public Task<LineProtocolWriteResult> WriteAsync(LineProtocolPayload payload, CancellationToken cancellationToken = default(CancellationToken))
Expand Down
5 changes: 3 additions & 2 deletions src/InfluxDB.LineProtocol/Client/LineProtocolUdpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ public LineProtocolUdpClient(
Uri serverBaseAddress,
string database,
string username = null,
string password = null)
:base(serverBaseAddress, database, username, password)
string password = null,
string retentionPolicy = null)
:base(serverBaseAddress, database, username, password, retentionPolicy)
{
if (serverBaseAddress == null)
throw new ArgumentNullException(nameof(serverBaseAddress));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class MockLineProtocolClient : LineProtocolClient
{
}

private MockLineProtocolClient(MockHttpMessageHandler handler, Uri serverBaseAddress, string database) : base(handler, serverBaseAddress, database, null, null)
private MockLineProtocolClient(MockHttpMessageHandler handler, Uri serverBaseAddress, string database) : base(handler, serverBaseAddress, database, null, null, null)
{
Handler = handler;
BaseAddress = serverBaseAddress;
Expand Down

0 comments on commit fe0c50d

Please sign in to comment.