From 0d7ce5e67a039fa49fcae1d83af5b3cf1642cf55 Mon Sep 17 00:00:00 2001 From: Alissa Tung Date: Sun, 29 Jan 2023 16:54:42 +0800 Subject: [PATCH] producer: refactor buffer impl --- src/hstreamdb/src/producer.rs | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/src/hstreamdb/src/producer.rs b/src/hstreamdb/src/producer.rs index 9efeed1..7c226f8 100644 --- a/src/hstreamdb/src/producer.rs +++ b/src/hstreamdb/src/producer.rs @@ -241,8 +241,11 @@ impl Producer { } Ok(shard_id) => match self.shard_buffer.get_mut(&shard_id) { None => { - let mut buffer_state: BufferState = default(); - buffer_state.modify(&record); + let buffer_state: BufferState = { + let mut buffer_state = BufferState::default(); + buffer_state.modify(&record); + buffer_state + }; self.shard_buffer_state.insert(shard_id, buffer_state); self.shard_buffer.insert(shard_id, vec![record]); self.shard_buffer_result @@ -263,18 +266,22 @@ impl Producer { } } Some(buffer) => { - self.shard_buffer_result - .get_mut(&shard_id) - .unwrap() - .push(result_sender); - let buffer_state = self.shard_buffer_state.get_mut(&shard_id).unwrap(); - buffer_state.modify(&record); - buffer.push(record); - - if buffer_state.check(&self.flush_settings) { - self.flush(shard_id).await.unwrap_or_else(|err| { - log::error!("producer flush error: shard_id = {shard_id}, {err}") - }); + if buffer.is_empty() { + todo!() + } else { + self.shard_buffer_result + .get_mut(&shard_id) + .unwrap() + .push(result_sender); + let buffer_state = self.shard_buffer_state.get_mut(&shard_id).unwrap(); + buffer_state.modify(&record); + buffer.push(record); + + if buffer_state.check(&self.flush_settings) { + self.flush(shard_id).await.unwrap_or_else(|err| { + log::error!("producer flush error: shard_id = {shard_id}, {err}") + }); + } } } },