Python
Without Router - Current Approach
# Assuming this as previous logic without Router
import requests
entity_metadata_from_cr = {
"baseUrl": "http://fip-1.dev.sahamati.org.in/fip-simulate",
"id": "FIP-SIMULATOR"
}
def get_http_config(base_url, route, headers, data, method_type):
return {
"url": f"{base_url}{route}",
"headers": headers,
"data": data,
"method": method_type
}
def execute_discovery_request():
route = "/v2/Accounts/discover"
config = get_http_config(
base_url=entity_metadata_from_cr["baseUrl"],
route=route,
headers={},
data={},
method_type="POST"
)
try:
response = requests.request(**config)
return response.json()
except Exception as error:
print(f"Error making discovery request: {str(error)}")
raise
def perform_another_operations(account_discover_response):
print("Perform another operations with the response", account_discover_response)
if __name__ == "__main__":
try:
response = execute_discovery_request()
perform_another_operations(response)
print("Account discovery successful.")
except Exception:
print("Account discovery failed.")
finally:
print("Account discovery completed.")
With Router Integration - Request
import requests
import json
from datetime import datetime
from typing import Dict, Any
# ==============================
# Config Metadata
# ==============================
ENTITY_METADATA_FROM_CR = {
"baseUrl": "http://fip-1.sandbox.sahamati.org.in/fip-simulate",
"id": "FIP-SIMULATOR"
}
ROUTER_INTEGRATION_HELPER_URL = "https://api.sandbox.sahamati.org.in/router-helper/v1/request/header"
# ==============================
# Core Functions
# ==============================
def build_discovery_request_body() -> Dict[str, Any]:
"""Creates the ReBIT-defined account discovery request body."""
return {
"ver": "2.0.0",
"timestamp": datetime.utcnow().isoformat(),
"txnid": "f35761ac-4a18-11e8-96ff-0277a9fbfedc2",
"Customer": {
"id": "9766334467@aa_simulator",
"Identifiers": [
{
"category": "STRONG",
"type": "AADHAAR",
"value": "XXXXXXXXXXXXXXXX"
}
]
},
"FITypes": ["DEPOSIT"]
}
def get_http_config(base_url: str, route: str, headers: Dict[str, str], data: Dict[str, Any], method_type: str) -> Dict[str, Any]:
"""Builds base HTTP config."""
return {
"url": f"{base_url}{route}",
"headers": headers,
"json": data,
"method": method_type
}
def router_integration_helper(route: str, rebit_request_body: Dict[str, Any]) -> Dict[str, Any]:
"""Calls Router Integration Helper service to get host and x-request-meta."""
payload = {
"rebitAPIEndpoint": route,
"recipientId": ENTITY_METADATA_FROM_CR["id"],
"customerId": "96203773344@aa_simulator",
"requestBody": rebit_request_body
}
response = requests.post(
ROUTER_INTEGRATION_HELPER_URL,
json=payload,
headers={"Content-Type": "application/json"},
timeout=30
)
response.raise_for_status()
return response.json()
def add_sahamati_configuration(config: Dict[str, Any], route: str, rebit_request_body: Dict[str, Any]) -> Dict[str, Any]:
"""Updates config with router URL and adds x-request-meta header by calling Router Helper API."""
header_response = router_integration_helper(route, rebit_request_body)
target_host = header_response.get("targetHost")
if not target_host:
raise RuntimeError("Router Helper response missing required field 'targetHost'")
# Update URL
config["url"] = f"{target_host}{route}"
config.setdefault("headers", {})
# Add x-request-meta header only if it exists
x_request_meta = header_response.get("x-request-meta")
if x_request_meta:
config["headers"]["x-request-meta"] = x_request_meta
return config
def execute_discovery_request() -> Dict[str, Any]:
"""Executes account discovery request via Sahamati Router (helper API flow)."""
route = "/Accounts/discover"
rebit_request_body = build_discovery_request_body()
# Step 1: Base config
config = get_http_config(
base_url=ENTITY_METADATA_FROM_CR["baseUrl"],
route=route,
headers={},
data=rebit_request_body,
method_type="POST"
)
# Step 2: Add Router configuration (call helper API)
config = add_sahamati_configuration(config, route, rebit_request_body)
# Step 3: Execute request
response = requests.request(**config, timeout=30)
response.raise_for_status()
return response.json()
def perform_another_operations(account_discover_response: Dict[str, Any]) -> None:
"""Placeholder for next steps."""
print("Next steps with response:")
print(json.dumps(account_discover_response, indent=2))
# ==============================
# Main Runner
# ==============================
if __name__ == "__main__":
try:
response = execute_discovery_request()
perform_another_operations(response)
print("Account discovery successful.")
except Exception as e:
print(f"Account discovery failed. Reason: {e}")
finally:
print("Account discovery completed.")
With Router Integration - Response
import json
import requests
from flask import Flask, request, jsonify, make_response
app = Flask(__name__)
ROUTER_RESPONSE_HEADER_URL = "https://api.sandbox.sahamati.org.in/router-helper/v1/response/header"
RECIPIENT_ID = "AA-SIMULATOR"
def fetch_account_from_request_body(request_body):
if not request_body:
return None
customer = request_body.get("Customer")
if not isinstance(customer, dict):
return None
identifiers = customer.get("Identifiers")
if not isinstance(identifiers, list):
return None
for id_obj in identifiers:
if not isinstance(id_obj, dict):
continue
category = id_obj.get("category", "")
id_type = id_obj.get("type", "")
value = id_obj.get("value", "")
if category == "STRONG" and id_type == "AADHAAR" and value:
return {
"FIType": "DEPOSIT",
"accType": "SAVINGS",
"accRefNumber": "BANK11111111",
"maskedAccNumber": "XXXXXXX3468"
}
return None
def routerIntegrationHelper(request_body):
headers = {"Content-Type": "application/json"}
resp = requests.post(ROUTER_RESPONSE_HEADER_URL, headers=headers, data=json.dumps(request_body), timeout=30)
resp.raise_for_status()
return resp.json()
@app.route("/accounts/discover", methods=["POST"])
def handle_account_discovery():
incoming_request_body = request.get_json(force=True)
discovered_account = fetch_account_from_request_body(incoming_request_body)
if not discovered_account:
return make_response(jsonify({"error": "Account not found"}), 404)
response_body = {
"ver": "2.0.0",
"timestamp": now().isoformat(),
"txnid": "f35761ac-4a18-11e8-96ff-0277a9fbfedcs",
"DiscoveredAccounts": [discovered_account]
}
response_header_request_body = {
"rebitAPIEndpoint": "/accounts/discover",
# Optionally extract customerId from incoming_request_body if available
"customerId": "9977336577@aa_simulator",
"recipientId": RECIPIENT_ID,
"additionalAttributes": {},
"responseBody": response_body
}
response_header_resp = routerIntegrationHelper(response_header_request_body)
x_response_meta = response_header_resp.get("x-response-meta")
response = make_response(jsonify(response_body))
if x_response_meta:
response.headers["x-response-meta"] = x_response_meta
return response
if __name__ == "__main__":
app.run(debug=True)
Last updated
Was this helpful?