Skip to content

Commit

Permalink
Merge pull request #14 from GlobalNOC/1.2.2-dev
Browse files Browse the repository at this point in the history
1.2.2 dev
  • Loading branch information
JoshuaMcNamara98 authored Feb 3, 2024
2 parents 706d751 + a5057bb commit 3c6c4d8
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 109 deletions.
1 change: 1 addition & 0 deletions conf/config.xml.example
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
<port>5672</port>
<pending-queue>timeseries_pending_aggregate</pending-queue>
<finished-queue>timeseries_finished_aggregate</finished-queue>
<failed-queue>timeseries_failed_aggregate</failed-queue>
</rabbit>

</config>
2 changes: 1 addition & 1 deletion grnoc-tsds-aggregate.spec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Summary: GRNOC TSDS Aggregate
Name: grnoc-tsds-aggregate
Version: 1.2.1
Version: 1.2.2
Release: 1%{?dist}
License: GRNOC
Group: Measurement
Expand Down
2 changes: 1 addition & 1 deletion lib/GRNOC/TSDS/Aggregate.pm
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ package GRNOC::TSDS::Aggregate;
use strict;
use warnings;

our $VERSION = "1.2.1";
our $VERSION = "1.2.2";

1;
255 changes: 148 additions & 107 deletions lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use constant QUEUE_FETCH_TIMEOUT => 10 * 1000;
use constant RECONNECT_TIMEOUT => 10;
use constant PENDING_QUEUE_CHANNEL => 1;
use constant FINISHED_QUEUE_CHANNEL => 2;
use constant FAILED_QUEUE_CHANNEL => 3;
use constant SERVICE_CACHE_FILE => '/etc/grnoc/name-service-cacher/name-service.xml';
use constant COOKIES_FILE => '/var/lib/grnoc/tsds/aggregate/cookies.dat';

Expand Down Expand Up @@ -322,7 +323,19 @@ sub _consume_messages {

try {

$self->_aggregate_messages( $aggregates_to_process ) if ( @$aggregates_to_process > 0 );
my $results = $self->_aggregate_messages( $aggregates_to_process ) if ( @$aggregates_to_process > 0 );

# push any failed messages to the failed queue
my $failed_messages = $results->{'failed_messages'};
if (@$failed_messages > 0) {
$self->logger->error( "Failed to aggregate " . @$failed_messages . " messages.");
$self->rabbit->publish(
FAILED_QUEUE_CHANNEL,
$self->config->get( '/config/rabbit/failed-queue' ),
$self->json->encode( \@$failed_messages ),
{'exchange' => ''}
);
}
}

catch {
Expand All @@ -339,141 +352,162 @@ sub _aggregate_messages {
my ( $self, $messages ) = @_;

my $finished_messages = [];
my $results = {'failed_messages' => []};

foreach my $message ( @$messages ) {

my $type = $message->type;
my $from = $message->interval_from;
my $to = $message->interval_to;
my $start = $message->start;
my $end = $message->end;
my $meta = $message->meta;
my $values = $message->values;
my $required_meta = $message->required_meta;

# align to aggregation window we're getting data for
$start = nlowmult( $to, $start );
$end = nhimult( $to, $end );

my $min_max_mappings = $self->_get_min_max_mappings( required_meta => $required_meta,
meta => $meta );

my $hist_mappings = $self->_get_histogram_mappings( $values );

# craft the query needed to fetch the data from the necessary interval
my $from_clause = "from $type";
my $values_clause = $self->_get_values_clause( from => $from, values => $values, required_meta => $required_meta );
my $between_clause = $self->_get_between_clause( start => $start, end => $end, to => $to );
my $where_clause = $self->_get_where_clause( $meta );
my $by_clause = $self->_get_by_clause( $required_meta );
my $query = "$values_clause $between_clause $by_clause $from_clause $where_clause";

# issue the query to the webservice to retrieve the data we need to aggregate
$self->websvc->set_raw_output(1);
my $results = $self->websvc->query( query => $query,
output => 'bson');

# handle any errors attempting to query the webservice
if ( !$results || $self->websvc->get_error() ) {

die( "Error querying TSDS web service: " . $self->websvc->get_error() );
}
try {
my $type = $message->type;
my $from = $message->interval_from;
my $to = $message->interval_to;
my $start = $message->start;
my $end = $message->end;
my $meta = $message->meta;
my $values = $message->values;
my $required_meta = $message->required_meta;

# align to aggregation window we're getting data for
$start = nlowmult( $to, $start );
$end = nhimult( $to, $end );

my $min_max_mappings = $self->_get_min_max_mappings( required_meta => $required_meta,
meta => $meta );

my $hist_mappings = $self->_get_histogram_mappings( $values );

# craft the query needed to fetch the data from the necessary interval
my $from_clause = "from $type";
my $values_clause = $self->_get_values_clause( from => $from, values => $values, required_meta => $required_meta );
my $between_clause = $self->_get_between_clause( start => $start, end => $end, to => $to );
my $where_clause = $self->_get_where_clause( $meta );
my $by_clause = $self->_get_by_clause( $required_meta );
my $query = "$values_clause $between_clause $by_clause $from_clause $where_clause";

# issue the query to the webservice to retrieve the data we need to aggregate
$self->websvc->set_raw_output(1);
my $results = $self->websvc->query( query => $query,
output => 'bson');

# handle any errors attempting to query the webservice
if ( !$results || $self->websvc->get_error() ) {

die( "Error querying TSDS web service: " . $self->websvc->get_error() );
}

$results = MongoDB::BSON->new()->decode_one($results);
$results = MongoDB::BSON->new()->decode_one($results);

if ( $results->{'error'} ) {
if ( $results->{'error'} ) {

die( "Error retrieving data from TSDS: " . $results->{'error_text'} );
}
die( "Error retrieving data from TSDS: " . $results->{'error_text'} );
}

$results = $results->{'results'};
$results = $results->{'results'};

my $buckets = {};
my $meta_info = {};
my $buckets = {};
my $meta_info = {};

foreach my $result ( @$results ) {
foreach my $result ( @$results ) {

my @value_types = keys( %$result );
my $meta_data = {};
my @meta_keys;
my @value_types = keys( %$result );
my $meta_data = {};
my @meta_keys;

# the required fields are not one of the possible value types
# we're also going to omit anything that came back as a result of
# aggregation
foreach my $required ( @$required_meta ) {
# the required fields are not one of the possible value types
# we're also going to omit anything that came back as a result of
# aggregation
foreach my $required ( @$required_meta ) {

@value_types = grep { $_ ne $required && $_ !~ /__(min|max|hist)$/ } @value_types;
$meta_data->{$required} = $result->{$required};
push( @meta_keys, $result->{$required} );
}
@value_types = grep { $_ ne $required && $_ !~ /__(min|max|hist)$/ } @value_types;
$meta_data->{$required} = $result->{$required};
push( @meta_keys, $result->{$required} );
}

my $key = join( '__', @meta_keys );
$meta_info->{$key} = $meta_data;
# Put all of the data points into their respective floored
# buckets
foreach my $value_type ( @value_types ) {
my $key = join( '__', @meta_keys );
$meta_info->{$key} = $meta_data;
# Put all of the data points into their respective floored
# buckets
foreach my $value_type ( @value_types ) {

my $entries = $result->{$value_type};
my $entries = $result->{$value_type};

next if ( !defined( $entries ) );
next if ( !defined( $entries ) );

# Figure this out once, makes it easier later in the code to
# refer to a consistent flag
my $is_aggregate = exists($result->{$value_type . "__max"}) ? 1 : 0;
# Figure this out once, makes it easier later in the code to
# refer to a consistent flag
my $is_aggregate = exists($result->{$value_type . "__max"}) ? 1 : 0;

my $entries_max = $result->{$value_type . "__max"} || [];
my $entries_min = $result->{$value_type . "__min"} || [];
my $entries_hist = $result->{$value_type . "__hist"} || [];
my $entries_max = $result->{$value_type . "__max"} || [];
my $entries_min = $result->{$value_type . "__min"} || [];
my $entries_hist = $result->{$value_type . "__hist"} || [];

for (my $i = 0; $i < @$entries; $i++){
my $entry = $entries->[$i];
for (my $i = 0; $i < @$entries; $i++){
my $entry = $entries->[$i];

my ( $timestamp, $value ) = @$entry;
my ( $timestamp, $value ) = @$entry;

my $bucket = $to * int($timestamp / $to);
my $bucket = $to * int($timestamp / $to);

push( @{$buckets->{$key}{$bucket}{$value_type}}, {is_aggregate => $is_aggregate,
avg => $value,
min => $entries_min->[$i][1],
max => $entries_max->[$i][1],
hist => $entries_hist->[$i][1],
timestamp => $timestamp}
);
}
}
}
push( @{$buckets->{$key}{$bucket}{$value_type}}, {is_aggregate => $is_aggregate,
avg => $value,
min => $entries_min->[$i][1],
max => $entries_max->[$i][1],
hist => $entries_hist->[$i][1],
timestamp => $timestamp}
);
}
}
}

# handle every measurement that was bucketed
my @keys = keys( %$buckets );
# handle every measurement that was bucketed
my @keys = keys( %$buckets );

foreach my $key ( @keys ) {
foreach my $key ( @keys ) {

# grab meta data hash to pass for this measurement
my $meta_data = $meta_info->{$key};
# grab meta data hash to pass for this measurement
my $meta_data = $meta_info->{$key};

# handle every bucketed timestamp for this measurement
my @timestamps = keys( %{$buckets->{$key}} );
# handle every bucketed timestamp for this measurement
my @timestamps = keys( %{$buckets->{$key}} );

foreach my $time ( @timestamps ) {
foreach my $time ( @timestamps ) {

# all the data during this bucket to aggregate for this measurement
my $data = $buckets->{$key}{$time};
# all the data during this bucket to aggregate for this measurement
my $data = $buckets->{$key}{$time};

my $aggregated = $self->_aggregate( data => $data,
required_meta => $required_meta,
hist_mappings => $hist_mappings,
hist_min_max_mappings => $min_max_mappings,
key => $key );
$aggregated->{'type'} = "$type.aggregate";
$aggregated->{'time'} = $time;
$aggregated->{'interval'} = $to;
$aggregated->{'meta'} = $meta_data;
my $aggregated = $self->_aggregate( data => $data,
required_meta => $required_meta,
hist_mappings => $hist_mappings,
hist_min_max_mappings => $min_max_mappings,
key => $key );
$aggregated->{'type'} = "$type.aggregate";
$aggregated->{'time'} = $time;
$aggregated->{'interval'} = $to;
$aggregated->{'meta'} = $meta_data;

push( @$finished_messages, $aggregated );
}
}
push( @$finished_messages, $aggregated );
}
}
}
catch {
# any failed aggregates are not added to 'finished_messages'
# and are instead pushed to a failed queue
$self->logger->error( "Error aggregating message: $_" );

# Convert Message object to hash (for encoding to JSON later)
my %failed_message = (
type => $message->type,
interval_from => $message->interval_from,
interval_to => $message->interval_to,
start => $message->start,
end => $message->end,
meta => $message->meta,
values => $message->values,
required_meta => $message->required_meta
);
push( @{$results->{'failed_messages'}}, %failed_message );
}
}

my $num = @$finished_messages;
Expand All @@ -487,6 +521,8 @@ sub _aggregate_messages {

$self->rabbit->publish( FINISHED_QUEUE_CHANNEL, $queue, $self->json->encode( \@finished_messages ), {'exchange' => ''} );
}

return $results;
}

sub _aggregate {
Expand Down Expand Up @@ -785,6 +821,7 @@ sub _rabbit_connect {
my $rabbit_port = $self->config->get( '/config/rabbit/port' );
my $rabbit_pending_queue = $self->config->get( '/config/rabbit/pending-queue' );
my $rabbit_finished_queue = $self->config->get( '/config/rabbit/finished-queue' );
my $rabbit_failed_queue = $self->config->get( '/config/rabbit/failed-queue' );

while ( 1 ) {

Expand All @@ -808,6 +845,10 @@ sub _rabbit_connect {
$rabbit->channel_open( FINISHED_QUEUE_CHANNEL );
$rabbit->queue_declare( FINISHED_QUEUE_CHANNEL, $rabbit_finished_queue, {'auto_delete' => 0} );

# open channel to the failed aggregate queue we'll send to
$rabbit->channel_open( FAILED_QUEUE_CHANNEL );
$rabbit->queue_declare( FAILED_QUEUE_CHANNEL, $rabbit_failed_queue, {'auto_delete' => 0} );

$self->_set_rabbit( $rabbit );

$connected = 1;
Expand Down

0 comments on commit 3c6c4d8

Please sign in to comment.