taiga_sync_sprint11_p2.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import requests
  2. import urllib3
  3. urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
  4. base_url = 'https://192.168.130.161/taiga/api/v1'
  5. auth = requests.post(f'{base_url}/auth', json={'type': 'normal', 'username': 'FrancoisLange', 'password': 'BTSai123'}, verify=False).json()
  6. headers = {'Authorization': f'Bearer {auth["auth_token"]}', 'Content-Type': 'application/json'}
  7. proj_id = 21
  8. print("Fetching Sprints...")
  9. milestones = requests.get(f'{base_url}/milestones?project={proj_id}', headers=headers, verify=False).json()
  10. sprint11 = next((m for m in milestones if m['name'] == 'Sprint 11'), None)
  11. if not sprint11:
  12. print("Sprint 11 not found! Exiting.")
  13. exit(1)
  14. sprint_id = sprint11['id']
  15. print(f"Sprint 11 ID: {sprint_id}")
  16. stories = [
  17. {"subject": "AI Dietary Restriction SQL Enforcement", "description": "Dynamically map User EAV profiles (Diabetes, Kosher, Halal, Christian) directly to SQL WHERE clauses to enforce medical boundaries before AI generation."},
  18. {"subject": "Zabbix Database Ingestion Telemetry", "description": "Build python script to fetch exact row counts from products_core and push SNMP traps to Zabbix Server 192.168.130.170."}
  19. ]
  20. for s in stories:
  21. payload = {
  22. "project": proj_id,
  23. "subject": s["subject"],
  24. "description": s["description"],
  25. "milestone": sprint_id
  26. }
  27. res = requests.post(f'{base_url}/userstories', json=payload, headers=headers, verify=False)
  28. if res.status_code == 201:
  29. us = res.json()
  30. print(f"Created US: {us['subject']}")
  31. # Create a task for it
  32. t_payload = {
  33. "project": proj_id,
  34. "subject": f"Execute: {us['subject']}",
  35. "user_story": us['id'],
  36. "milestone": sprint_id
  37. }
  38. requests.post(f'{base_url}/tasks', json=t_payload, headers=headers, verify=False)
  39. print("New User Stories populated!")
  40. # Get Closed status IDs
  41. us_statuses = requests.get(f'{base_url}/userstory-statuses?project={proj_id}', headers=headers, verify=False).json()
  42. task_statuses = requests.get(f'{base_url}/task-statuses?project={proj_id}', headers=headers, verify=False).json()
  43. closed_us_status = next((s['id'] for s in us_statuses if s['is_closed']), None)
  44. closed_task_status = next((s['id'] for s in task_statuses if s['is_closed']), None)
  45. # Update User Stories
  46. us_list = requests.get(f'{base_url}/userstories?project={proj_id}&milestone={sprint_id}', headers=headers, verify=False).json()
  47. for us in us_list:
  48. if us['status'] != closed_us_status:
  49. payload = {"status": closed_us_status, "version": us['version']}
  50. requests.patch(f'{base_url}/userstories/{us["id"]}', json=payload, headers=headers, verify=False)
  51. print(f"Closed User Story: {us['subject']}")
  52. # Update Tasks
  53. tasks = requests.get(f'{base_url}/tasks?project={proj_id}&milestone={sprint_id}', headers=headers, verify=False).json()
  54. for task in tasks:
  55. if task['status'] != closed_task_status:
  56. payload = {"status": closed_task_status, "version": task['version']}
  57. requests.patch(f'{base_url}/tasks/{task["id"]}', json=payload, headers=headers, verify=False)
  58. print(f"Closed Task: {task['subject']}")
  59. print("Sprint 11 successfully closed!")