Skip to content

Commit

Permalink
fix replication (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjor authored Nov 23, 2024
1 parent 272e6b1 commit 2d89056
Showing 1 changed file with 80 additions and 78 deletions.
158 changes: 80 additions & 78 deletions dataprep-tools/filecoin/boost_create_deals.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def get_providers(
providers = environ.get("PROVIDERS")
assert providers, "PROVIDERS environment variable must be set if not using CID gravity"
providers = providers.split(",")
return provideres
return providers

head = get_chain_head()
start_epoch = head + start_epoch_head_offset
Expand Down Expand Up @@ -299,89 +299,91 @@ def create_deals(metadata_obj):
logging.info("creating deal for ")
logging.info(file_item)

providers = get_providers(
piece_cid=file_item["commp_piece_cid"],
size=file_item["file_size"],
padded_size=file_item["padded_size"],
)
logging.info(f"found providers: {providers}")

for provider in providers:
if file_item["commp_piece_cid"] not in replications:
replications[file_item["commp_piece_cid"]] = []

if file_item["commp_piece_cid"] in replications:
if provider in replications[file_item["commp_piece_cid"]]:
logging.info(
"skipping %s, already have deal with %s"
% (file_item["commp_piece_cid"], provider)
)
continue
while len(replications[file_item["commp_piece_cid"]]) < replication_factor:

if file_item["commp_piece_cid"] not in replications:
replications[file_item["commp_piece_cid"]] = []
elif (
len(replications[file_item["commp_piece_cid"]])
>= replication_factor
):
logging.info(
"skipping %s, already replicated %s times"
% (
file_item["commp_piece_cid"],
replications[file_item["commp_piece_cid"]],
)
)
continue

params = {
"provider": provider,
"commp": file_item["commp_piece_cid"],
"piece-size": file_item["padded_size"],
"car-size": file_item["file_size"],
"payload-cid": file_item["payload_cid"],
"storage-price": "0",
"start-epoch-head-offset": start_epoch_head_offset,
"verified": "true",
"duration": 1468800,
"wallet": environ.get("WALLET"),
}
deal_arg = "deal"
if online:
params["http-url"] = file_item["url"]
else:
deal_arg = "offline-deal"

logging.info(params)
cmd = ["boost", "--vv", "--json=1", deal_arg] + [
f"--{k}={v}" for k, v in params.items()
]

logging.info(cmd)

if dry_run:
out = '{ "dealUuid": "dry-run-uuid", "dealState": "dry-run-state"}'
else:
try:
out = check_output(cmd, text=True).strip()
except CalledProcessError as e:
logging.warning(f"WARNING: boost deal failed for {provider}:")
logging.warning(e)
continue
providers = get_providers(
piece_cid=file_item["commp_piece_cid"],
size=file_item["file_size"],
padded_size=file_item["padded_size"],
)
logging.info(f"found providers: {providers}")

logging.info(out)
res = json.loads(out)

deal_output = {
"provider": provider,
"deal_uuid": res.get("dealUuid"),
}
for provider in providers:
if file_item["commp_piece_cid"] in replications:
if provider in replications[file_item["commp_piece_cid"]]:
logging.info(
"skipping %s, already have deal with %s"
% (file_item["commp_piece_cid"], provider)
)
continue

replications[file_item["commp_piece_cid"]].append(provider)
if (len(replications[file_item["commp_piece_cid"]])
>= replication_factor
):
logging.info(
"skipping %s, already replicated %s times"
% (
file_item["commp_piece_cid"],
replications[file_item["commp_piece_cid"]],
)
)
continue

deal_output.update(file_item)
csv_writer.writerow(deal_output)
if provider not in deals_providers:
deals_providers[provider] = []
deals_providers[provider].append(deal_output)
log_file.close()
params = {
"provider": provider,
"commp": file_item["commp_piece_cid"],
"piece-size": file_item["padded_size"],
"car-size": file_item["file_size"],
"payload-cid": file_item["payload_cid"],
"storage-price": "0",
"start-epoch-head-offset": start_epoch_head_offset,
"verified": "true",
"duration": 1468800,
"wallet": environ.get("WALLET"),
}
deal_arg = "deal"
if online:
params["http-url"] = file_item["url"]
else:
deal_arg = "offline-deal"

logging.info(params)
cmd = ["boost", "--vv", "--json=1", deal_arg] + [
f"--{k}={v}" for k, v in params.items()
]

logging.info(cmd)

if dry_run:
out = '{ "dealUuid": "dry-run-uuid", "dealState": "dry-run-state"}'
else:
try:
out = check_output(cmd, text=True).strip()
except CalledProcessError as e:
logging.warning(f"WARNING: boost deal failed for {provider}:")
logging.warning(e)
continue

logging.info(out)
res = json.loads(out)

deal_output = {
"provider": provider,
"deal_uuid": res.get("dealUuid"),
}

replications[file_item["commp_piece_cid"]].append(provider)

deal_output.update(file_item)
csv_writer.writerow(deal_output)
if provider not in deals_providers:
deals_providers[provider] = []
deals_providers[provider].append(deal_output)
log_file.close()

if dry_run:
logging.info("completed processing dry run mode")
Expand Down

0 comments on commit 2d89056

Please sign in to comment.