Recipes
Recipe — bulk RFQ triage
Process 50–500 CAD files at once, rank by unit cost, fan out to suppliers.
Your inbox just got hit with a 200-part RFQ. Manually quoting each takes a week. ARCNM triages the whole batch in minutes, not days.
By the end you'll have:
- Uploaded the whole batch in parallel.
- Ranked the parts by unit cost.
- Flagged the expensive / failed parts that need human review.
- Pre-routed the rest to your supplier fleet.
Architecture
A bounded-concurrency fan-out, then poll each calc to completion:
inbox/*.step ─→ upload-and-quote (lite tier) ─→ poll GET /{id} ─→ rank + route
Use the lite extraction tier for triage — it is the fastest path.
Re-quote the parts you'll actually send with the full tier so the
audit blob is as thorough as possible.
Step 1 — Fan out the upload + queue
Each file goes through POST /api/v1/parts/calculations/upload-and-quote
(202, multipart) and is then polled to a terminal status. Bound the
concurrency so you stay under your rate limits.
import asyncio, os
from pathlib import Path
import httpx
BASE = "https://api.arcnm.io/api/v1"
HEADERS = {"X-API-Key": os.environ["ARCNM_API_KEY"]}
ENV_ID = os.environ["ENV_ID"]
CONCURRENCY = 16
async def poll(client: httpx.AsyncClient, calc_id: str, timeout_s: int = 180) -> dict:
deadline = asyncio.get_event_loop().time() + timeout_s
while asyncio.get_event_loop().time() < deadline:
r = await client.get(f"{BASE}/parts/calculations/{calc_id}", headers=HEADERS)
r.raise_for_status()
calc = r.json()
if calc["status"] in ("succeeded", "failed", "cancelled", "timed_out"):
return calc
await asyncio.sleep(2)
raise TimeoutError(calc_id)
async def quote_one(client, sem, path: Path) -> dict:
async with sem:
with open(path, "rb") as fh:
r = await client.post(
f"{BASE}/parts/calculations/upload-and-quote",
headers=HEADERS,
data={
"costing_environment_id": ENV_ID,
"part_number": path.stem,
"lot_size": 10, # triage volume
"extraction_tier": "lite",
},
files={"cad_file": (path.name, fh, "application/step")},
)
r.raise_for_status()
return await poll(client, r.json()["id"])
async def main():
files = list(Path("./rfq/").glob("*.step"))
async with httpx.AsyncClient(timeout=30) as client:
sem = asyncio.Semaphore(CONCURRENCY)
results = await asyncio.gather(
*[quote_one(client, sem, p) for p in files],
return_exceptions=True,
)
for path, result in zip(files, results):
cost = result.get("unit_cost") if isinstance(result, dict) else str(result)
print(path.stem, "→", cost)
asyncio.run(main())
import pLimit from "p-limit"
import { readFile, readdir } from "node:fs/promises"
const BASE = "https://api.arcnm.io/api/v1"
const HEADERS = { "X-API-Key": process.env.ARCNM_API_KEY! }
const ENV_ID = process.env.ENV_ID!
const limit = pLimit(16)
async function poll(calcId: string, timeoutMs = 180_000): Promise<any> {
const deadline = Date.now() + timeoutMs
while (Date.now() < deadline) {
const r = await fetch(`${BASE}/parts/calculations/${calcId}`, { headers: HEADERS })
if (!r.ok) throw new Error(`${r.status}`)
const calc = await r.json()
if (["succeeded", "failed", "cancelled", "timed_out"].includes(calc.status)) return calc
await new Promise(res => setTimeout(res, 2_000))
}
throw new Error(`timeout ${calcId}`)
}
const files = (await readdir("./rfq")).filter(f => f.endsWith(".step"))
const results = await Promise.all(
files.map(name => limit(async () => {
const form = new FormData()
form.set("costing_environment_id", ENV_ID)
form.set("part_number", name.replace(/\.[^.]*$/, ""))
form.set("lot_size", "10")
form.set("extraction_tier", "lite")
form.set("cad_file", new Blob([await readFile(`./rfq/${name}`)]), name)
const r = await fetch(`${BASE}/parts/calculations/upload-and-quote`,
{ method: "POST", headers: HEADERS, body: form })
if (!r.ok) throw new Error(`${r.status} ${await r.text()}`)
return poll((await r.json()).id)
})),
)
results.forEach((res, i) => console.log(files[i], "→", res.unit_cost))
# One upload per file in parallel; print "<file> → <calc_id>".
# Poll each id afterwards (see Step 2) until it reaches a terminal status.
ls ./rfq/*.step | xargs -P 16 -I {} bash -c '
RESP=$(curl -s -X POST https://api.arcnm.io/api/v1/parts/calculations/upload-and-quote \
-H "X-API-Key: $ARCNM_API_KEY" \
-F "costing_environment_id=$ENV_ID" \
-F "part_number=$(basename "{}" .step)" \
-F "cad_file=@{};type=application/step" \
-F "lot_size=10" \
-F "extraction_tier=lite")
echo "{} → $(echo "$RESP" | jq -r .id)"
'
Concurrency 16 is a reasonable default. Watch for 429 responses and
back off with exponential backoff + jitter (the API doesn't emit a
Retry-After header). See Rate limits.
Step 2 — Rank by unit cost
After the calcs land, sort the succeeded ones by unit_cost and
collect the failures separately:
succeeded = [(p, r) for p, r in zip(files, results)
if isinstance(r, dict) and r["status"] == "succeeded"]
failed = [(p, r) for p, r in zip(files, results)
if not (isinstance(r, dict) and r["status"] == "succeeded")]
succeeded.sort(key=lambda pr: pr[1]["unit_cost"])
print(f"{len(succeeded)} priced, {len(failed)} need attention\n")
for path, calc in succeeded[:10]:
print(f" {calc['unit_cost']:>8.2f} {calc['currency']} {path.stem}")
You can also pull the batch back later with
GET /api/v1/parts/calculations?limit=200&status_filter=succeeded
({ "items": [...], "count": N }). The list projection includes
id, unit_cost, currency, lot_size, and status per row.
Step 3 — Flag parts for human review
Route the failures and the top-of-range prices to a human estimator.
A failed calc carries an error object; a high relative price often
signals an unusual part:
def needs_review(calc: dict, median_cost: float) -> bool:
if calc["status"] != "succeeded":
return True # extraction / pricing failed
return calc["unit_cost"] > 3 * median_cost # cost outlier vs the batch
costs = sorted(c["unit_cost"] for _, c in succeeded)
median = costs[len(costs) // 2] if costs else 0.0
for path, calc in zip(files, results):
if isinstance(calc, dict) and needs_review(calc, median):
send_to_estimator(path, calc)
elif not isinstance(calc, dict):
send_to_estimator(path, {"error": str(calc)})
A failed status usually means one of:
- The CAD or drawing could not be parsed (
error.details.retryabletells you whether a re-run is worth trying). - The detected discipline isn't one the engine prices yet
(
unsupported_discipline). - The material reference didn't resolve (
material_not_resolved).
These are exactly the cases a human should look at.
Step 4 — Re-quote winners with the full tier
For the quotes you'll actually send to the customer, re-quote at the
full extraction tier so the PDF carries the most thorough audit
blob. Re-use the existing revision via the JSON /quote endpoint so
you don't re-upload:
finalists = succeeded[:10] # cheapest 10 from Step 2
for path, triage in finalists:
r = requests.post(
"https://api.arcnm.io/api/v1/parts/calculations/quote",
headers={"X-API-Key": os.environ["ARCNM_API_KEY"]},
json={
"part_revision_id": triage["part_revision_id"],
"costing_environment_id": os.environ["ENV_ID"],
"lot_size": triage["lot_size"],
"extraction_tier": "full", # upgrade
},
)
r.raise_for_status()
final = poll(r.json()["id"]) # poll helper from Step 1
Step 5 — Send finalists to suppliers
Map parts to your supplier pool and fan out. The supplier sees only
your arcanum_calc_id — keep the audit blob server-side:
import httpx
SUPPLIERS = {
"shop_a": "https://supplier-a.example.com/rfq",
"shop_b": "https://supplier-b.example.com/rfq",
}
async def send_to_supplier(client, url, parts):
payload = [{"part_number": p.stem, "arcanum_calc_id": c["id"]} for p, c in parts]
await client.post(url, json=payload)
async with httpx.AsyncClient() as client:
await asyncio.gather(*[
send_to_supplier(client, url, finalists)
for url in SUPPLIERS.values()
])
If a supplier needs to see the price, fetch it from
GET /api/v1/parts/calculations/{id} with a parts:read key and pass
on just the unit_cost + currency — the analytics blob stays with
you.
Cost
Each run places a wallet hold (cost + margin) when it is enqueued; the
lite tier is cheaper than full. Failed requests and idempotent
replays don't bill. For current per-run rates and your live spend,
see the pricing page and read your usage with a
wallet:read-scoped key.
See also
- Pricing & billing — how holds and the wallet work.
- Rate limits — concurrency ceilings and backoff guidance.
- Recipes → Quote a part — the single-part flow.
- Recipes → ERP integration — wire into SAP / Vault.