Skip to content

Commit

Permalink
Make directories file:filename_all() types.
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Nov 26, 2024
1 parent 15c2ced commit 7841fae
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 30 deletions.
21 changes: 12 additions & 9 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
-type transform_fun() :: fun ((ra_index(), ra_term(), ra_server:command()) -> term()).

-type effect() ::
{delete_snapshot, Dir :: file:filename(), ra_idxterm()} |
{delete_snapshot, Dir :: file:filename_all(), ra_idxterm()} |
{monitor, process, log, pid()} |
ra_snapshot:effect() |
ra_server:effect().
Expand Down Expand Up @@ -163,13 +163,16 @@
overview/0
]).

-define(SNAPSHOTS_DIR, <<"snapshots">>).
-define(CHECKPOINTS_DIR, <<"checkpoints">>).

pre_init(#{uid := UId,
system_config := #{data_dir := DataDir}} = Conf) ->
Dir = server_data_dir(DataDir, UId),
SnapModule = maps:get(snapshot_module, Conf, ?DEFAULT_SNAPSHOT_MODULE),
MaxCheckpoints = maps:get(max_checkpoints, Conf, ?DEFAULT_MAX_CHECKPOINTS),
SnapshotsDir = filename:join(Dir, "snapshots"),
CheckpointsDir = filename:join(Dir, "checkpoints"),
SnapshotsDir = filename:join(Dir, ?SNAPSHOTS_DIR),
CheckpointsDir = filename:join(Dir, ?CHECKPOINTS_DIR),
_ = ra_snapshot:init(UId, SnapModule, SnapshotsDir,
CheckpointsDir, undefined, MaxCheckpoints),
ok.
Expand All @@ -192,8 +195,8 @@ init(#{uid := UId,
CPInterval = maps:get(min_checkpoint_interval, Conf,
?MIN_CHECKPOINT_INTERVAL),
MaxCheckpoints = maps:get(max_checkpoints, Conf, ?DEFAULT_MAX_CHECKPOINTS),
SnapshotsDir = filename:join(Dir, "snapshots"),
CheckpointsDir = filename:join(Dir, "checkpoints"),
SnapshotsDir = filename:join(Dir, ?SNAPSHOTS_DIR),
CheckpointsDir = filename:join(Dir, ?CHECKPOINTS_DIR),
Counter = maps:get(counter, Conf, undefined),

%% ensure directories are there
Expand Down Expand Up @@ -1048,8 +1051,8 @@ overview(#?MODULE{last_index = LastIndex,

-spec write_config(ra_server:config(), state()) -> ok.
write_config(Config0, #?MODULE{cfg = #cfg{directory = Dir}}) ->
ConfigPath = filename:join(Dir, "config"),
TmpConfigPath = filename:join(Dir, "config.tmp"),
ConfigPath = filename:join(Dir, <<"config">>),
TmpConfigPath = filename:join(Dir, <<"config.tmp">>),
% clean config of potentially unserialisable data
Config = maps:without([parent,
counter,
Expand All @@ -1062,12 +1065,12 @@ write_config(Config0, #?MODULE{cfg = #cfg{directory = Dir}}) ->
ok = prim_file:rename(TmpConfigPath, ConfigPath),
ok.

-spec read_config(state() | file:filename()) ->
-spec read_config(state() | file:filename_all()) ->
{ok, ra_server:config()} | {error, term()}.
read_config(#?MODULE{cfg = #cfg{directory = Dir}}) ->
read_config(Dir);
read_config(Dir) ->
ConfigPath = filename:join(Dir, "config"),
ConfigPath = filename:join(Dir, <<"config">>),
ra_lib:consult(ConfigPath).

-spec delete_everything(state()) -> ok.
Expand Down
15 changes: 8 additions & 7 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
-type writer_name_cache() :: {NextIntId :: non_neg_integer(),
#{writer_id() => binary()}}.

-record(conf, {dir :: string(),
-record(conf, {dir :: file:filename_all(),
segment_writer = ra_log_segment_writer :: atom() | pid(),
compute_checksums = false :: boolean(),
max_size_bytes :: non_neg_integer(),
Expand Down Expand Up @@ -370,21 +370,22 @@ recover_wal(Dir, #conf{segment_writer = SegWriter,
ok = ra_log_segment_writer:await(SegWriter),
post_boot
end,
WalFiles = lists:sort(filelib:wildcard(filename:join(Dir, "*.wal"))),
{ok, Files} = file:list_dir(Dir),
WalFiles = lists:sort([F || F <- Files,
filename:extension(F) == ".wal"]),
AllWriters =
[begin
FBase = filename:basename(F),
?DEBUG("wal: recovering ~ts, Mode ~s", [FBase, Mode]),
Fd = open_at_first_record(F),
?DEBUG("wal: recovering ~ts, Mode ~s", [F, Mode]),
Fd = open_at_first_record(filename:join(Dir, F)),
{Time, #recovery{ranges = Ranges,
writers = Writers}} =
timer:tc(fun () -> recover_wal_chunks(Conf, Fd, Mode) end),

ok = ra_log_segment_writer:accept_mem_tables(SegWriter, Ranges, FBase),
ok = ra_log_segment_writer:accept_mem_tables(SegWriter, Ranges, F),

close_existing(Fd),
?DEBUG("wal: recovered ~ts time taken ~bms - Writer state recovered ~p",
[FBase, Time div 1000, Writers]),
[F, Time div 1000, Writers]),
Writers
end || F <- WalFiles],

Expand Down
28 changes: 14 additions & 14 deletions src/ra_snapshot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@
%% typically <data_dir>/snapshots
%% snapshot subdirs are store below
%% this as <data_dir>/snapshots/Term_Index
snapshot_directory :: file:filename(),
snapshot_directory :: file:filename_all(),
%% <data_dir>/checkpoints
%% like snapshots, these are also stored in subdirs
%% as <data_dir>/checkpoints/Term_Index
checkpoint_directory :: file:filename(),
checkpoint_directory :: file:filename_all(),
pending :: option({pid(), ra_idxterm(), kind()}),
accepting :: option(#accept{}),
current :: option(ra_idxterm()),
Expand All @@ -103,7 +103,7 @@
%% Runs in a separate process.
%% External storage should be available to read
%% `Sync' suggests whether the file should be synchronized with `fsync(1)'.
-callback write(Location :: file:filename(),
-callback write(Location :: file:filename_all(),
Meta :: meta(),
Ref :: term(),
Sync :: boolean()) ->
Expand All @@ -112,7 +112,7 @@
{error, file_err() | term()}.

%% Synchronizes the snapshot to disk.
-callback sync(Location :: file:filename()) ->
-callback sync(Location :: file:filename_all()) ->
ok |
{error, file_err() | term()}.

Expand All @@ -121,19 +121,19 @@
%% The read state should contain all the information required to read a chunk
%% The Context is the map returned by the context/0 callback
%% This can be used to inform the sender of receive capabilities.
-callback begin_read(Location :: file:filename(), Context :: map()) ->
-callback begin_read(Location :: file:filename_all(), Context :: map()) ->
{ok, Meta :: meta(), ReadState :: term()}
| {error, term()}.

%% Read a chunk of data from the snapshot using the read state
%% Returns a binary chunk of data and a continuation state
-callback read_chunk(ReadState,
ChunkSizeBytes :: non_neg_integer(),
Location :: file:filename()) ->
Location :: file:filename_all()) ->
{ok, Chunk :: term(), {next, ReadState} | last} | {error, term()}.

%% begin a stateful snapshot acceptance process
-callback begin_accept(SnapDir :: file:filename(),
-callback begin_accept(SnapDir :: file:filename_all(),
Meta :: meta()) ->
{ok, AcceptState :: term()} | {error, term()}.

Expand All @@ -149,15 +149,15 @@

%% Side-effect function
%% Recover machine state from file
-callback recover(Location :: file:filename()) ->
-callback recover(Location :: file:filename_all()) ->
{ok, Meta :: meta(), State :: term()} | {error, term()}.

%% validate the integrity of the snapshot
-callback validate(Location :: file:filename()) ->
-callback validate(Location :: file:filename_all()) ->
ok | {error, term()}.

%% Only read meta data from snapshot
-callback read_meta(Location :: file:filename()) ->
-callback read_meta(Location :: file:filename_all()) ->
{ok, meta()} |
{error, invalid_format |
{invalid_version, integer()} |
Expand All @@ -167,7 +167,7 @@

-callback context() -> map().

-spec init(ra_uid(), module(), file:filename(), file:filename(),
-spec init(ra_uid(), module(), file:filename_all(), file:filename_all(),
undefined | counters:counters_ref(), pos_integer()) ->
state().
init(UId, Module, SnapshotsDir, CheckpointDir, Counter, MaxCheckpoints) ->
Expand Down Expand Up @@ -339,7 +339,7 @@ accepting(#?MODULE{accepting = undefined}) ->
accepting(#?MODULE{accepting = #accept{idxterm = Accepting}}) ->
Accepting.

-spec directory(state(), kind()) -> file:filename().
-spec directory(state(), kind()) -> file:filename_all().
directory(#?MODULE{snapshot_directory = Dir}, snapshot) -> Dir;
directory(#?MODULE{checkpoint_directory = Dir}, checkpoint) -> Dir.

Expand Down Expand Up @@ -602,7 +602,7 @@ recover(#?MODULE{module = Mod,
end,
Mod:recover(Dir).

-spec read_meta(Module :: module(), Location :: file:filename()) ->
-spec read_meta(Module :: module(), Location :: file:filename_all()) ->
{ok, meta()} |
{error, invalid_format |
{invalid_version, integer()} |
Expand All @@ -613,7 +613,7 @@ read_meta(Module, Location) ->
Module:read_meta(Location).

-spec current_snapshot_dir(state()) ->
option(file:filename()).
option(file:filename_all()).
current_snapshot_dir(#?MODULE{snapshot_directory = Dir,
current = {Idx, Term}}) ->
make_snapshot_dir(Dir, Idx, Term);
Expand Down

0 comments on commit 7841fae

Please sign in to comment.