From 2d89056c290c1f0c026ac66deed849daee28f1f5 Mon Sep 17 00:00:00 2001 From: Anjor Kanekar Date: Sat, 23 Nov 2024 08:03:42 +0000 Subject: [PATCH] fix replication (#187) --- dataprep-tools/filecoin/boost_create_deals.py | 158 +++++++++--------- 1 file changed, 80 insertions(+), 78 deletions(-) diff --git a/dataprep-tools/filecoin/boost_create_deals.py b/dataprep-tools/filecoin/boost_create_deals.py index f7d89c88..e8905d35 100644 --- a/dataprep-tools/filecoin/boost_create_deals.py +++ b/dataprep-tools/filecoin/boost_create_deals.py @@ -139,7 +139,7 @@ def get_providers( providers = environ.get("PROVIDERS") assert providers, "PROVIDERS environment variable must be set if not using CID gravity" providers = providers.split(",") - return provideres + return providers head = get_chain_head() start_epoch = head + start_epoch_head_offset @@ -299,89 +299,91 @@ def create_deals(metadata_obj): logging.info("creating deal for ") logging.info(file_item) - providers = get_providers( - piece_cid=file_item["commp_piece_cid"], - size=file_item["file_size"], - padded_size=file_item["padded_size"], - ) - logging.info(f"found providers: {providers}") - - for provider in providers: + if file_item["commp_piece_cid"] not in replications: + replications[file_item["commp_piece_cid"]] = [] - if file_item["commp_piece_cid"] in replications: - if provider in replications[file_item["commp_piece_cid"]]: - logging.info( - "skipping %s, already have deal with %s" - % (file_item["commp_piece_cid"], provider) - ) - continue + while len(replications[file_item["commp_piece_cid"]]) < replication_factor: - if file_item["commp_piece_cid"] not in replications: - replications[file_item["commp_piece_cid"]] = [] - elif ( - len(replications[file_item["commp_piece_cid"]]) - >= replication_factor - ): - logging.info( - "skipping %s, already replicated %s times" - % ( - file_item["commp_piece_cid"], - replications[file_item["commp_piece_cid"]], - ) - ) - continue - - params = { - "provider": provider, - "commp": file_item["commp_piece_cid"], - "piece-size": file_item["padded_size"], - "car-size": file_item["file_size"], - "payload-cid": file_item["payload_cid"], - "storage-price": "0", - "start-epoch-head-offset": start_epoch_head_offset, - "verified": "true", - "duration": 1468800, - "wallet": environ.get("WALLET"), - } - deal_arg = "deal" - if online: - params["http-url"] = file_item["url"] - else: - deal_arg = "offline-deal" - - logging.info(params) - cmd = ["boost", "--vv", "--json=1", deal_arg] + [ - f"--{k}={v}" for k, v in params.items() - ] - - logging.info(cmd) - - if dry_run: - out = '{ "dealUuid": "dry-run-uuid", "dealState": "dry-run-state"}' - else: - try: - out = check_output(cmd, text=True).strip() - except CalledProcessError as e: - logging.warning(f"WARNING: boost deal failed for {provider}:") - logging.warning(e) - continue + providers = get_providers( + piece_cid=file_item["commp_piece_cid"], + size=file_item["file_size"], + padded_size=file_item["padded_size"], + ) + logging.info(f"found providers: {providers}") - logging.info(out) - res = json.loads(out) - deal_output = { - "provider": provider, - "deal_uuid": res.get("dealUuid"), - } + for provider in providers: + if file_item["commp_piece_cid"] in replications: + if provider in replications[file_item["commp_piece_cid"]]: + logging.info( + "skipping %s, already have deal with %s" + % (file_item["commp_piece_cid"], provider) + ) + continue - replications[file_item["commp_piece_cid"]].append(provider) + if (len(replications[file_item["commp_piece_cid"]]) + >= replication_factor + ): + logging.info( + "skipping %s, already replicated %s times" + % ( + file_item["commp_piece_cid"], + replications[file_item["commp_piece_cid"]], + ) + ) + continue - deal_output.update(file_item) - csv_writer.writerow(deal_output) - if provider not in deals_providers: - deals_providers[provider] = [] - deals_providers[provider].append(deal_output) - log_file.close() + params = { + "provider": provider, + "commp": file_item["commp_piece_cid"], + "piece-size": file_item["padded_size"], + "car-size": file_item["file_size"], + "payload-cid": file_item["payload_cid"], + "storage-price": "0", + "start-epoch-head-offset": start_epoch_head_offset, + "verified": "true", + "duration": 1468800, + "wallet": environ.get("WALLET"), + } + deal_arg = "deal" + if online: + params["http-url"] = file_item["url"] + else: + deal_arg = "offline-deal" + + logging.info(params) + cmd = ["boost", "--vv", "--json=1", deal_arg] + [ + f"--{k}={v}" for k, v in params.items() + ] + + logging.info(cmd) + + if dry_run: + out = '{ "dealUuid": "dry-run-uuid", "dealState": "dry-run-state"}' + else: + try: + out = check_output(cmd, text=True).strip() + except CalledProcessError as e: + logging.warning(f"WARNING: boost deal failed for {provider}:") + logging.warning(e) + continue + + logging.info(out) + res = json.loads(out) + + deal_output = { + "provider": provider, + "deal_uuid": res.get("dealUuid"), + } + + replications[file_item["commp_piece_cid"]].append(provider) + + deal_output.update(file_item) + csv_writer.writerow(deal_output) + if provider not in deals_providers: + deals_providers[provider] = [] + deals_providers[provider].append(deal_output) + log_file.close() if dry_run: logging.info("completed processing dry run mode")