guitarHeroButBetter/py/engine/prefabs/services.py
James Whiteman 3f7b409302 Python code
2026-01-30 21:16:42 -08:00

799 lines
25 KiB
Python

from __future__ import annotations
import json
import math
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from Box2D import (b2Body, b2CircleShape, b2EdgeShape, b2FixtureDef,
b2PolygonShape, b2Vec2, b2World)
import pyray as rl
from engine.framework import Service
from engine.math_extensions import v2
from engine.physics_debug import PhysicsDebugRenderer
from engine.raycasts import circle_hit, raycast_closest, rectangle_hit
from engine.LdtkJson import LdtkJSON, Level, LayerInstance, GridPoint
class MultiService(Service):
"""Service container for multiple services of the same base type.
Attributes:
services: Mapping of service name to instance.
"""
def __init__(self) -> None:
super().__init__()
self.services: Dict[str, Service] = {}
def init_service(self) -> None:
"""Initialize all contained services.
Returns:
None
"""
for service in self.services.values():
service.init()
super().init_service()
def update(self, delta_time: float) -> None:
"""Update all contained services.
Args:
delta_time: Seconds since the last frame.
Returns:
None
"""
for service in self.services.values():
service.update(delta_time)
super().update(delta_time)
def draw(self) -> None:
"""Draw all contained services.
Returns:
None
"""
for service in self.services.values():
service.draw()
def add_service(self, name: str, service_or_cls: Any, *args: Any, **kwargs: Any) -> Service:
"""Add a service instance or construct one from a class.
Args:
name: Name to register the service under.
service_or_cls: A Service instance or Service class.
*args: Positional args forwarded to the constructor.
**kwargs: Keyword args forwarded to the constructor.
Returns:
The service instance added.
"""
if isinstance(service_or_cls, Service):
service = service_or_cls
else:
service = service_or_cls(*args, **kwargs)
self.services[name] = service
return service
def get_service(self, name: str) -> Optional[Service]:
"""Get a service by name.
Args:
name: Registered name of the service.
Returns:
The service instance, or None if missing.
"""
return self.services.get(name)
class TextureService(Service):
"""Cache textures so they are loaded once.
Attributes:
textures: Mapping of filename to loaded Texture2D.
"""
def __init__(self) -> None:
super().__init__()
self.textures: Dict[str, rl.Texture2D] = {}
def get_texture(self, filename: str) -> rl.Texture2D:
"""Get or load a texture by filename.
Args:
filename: Path to the texture file.
Returns:
The loaded Texture2D.
"""
if filename not in self.textures:
self.textures[filename] = rl.load_texture(filename)
return self.textures[filename]
class SoundService(Service):
"""Cache sounds and create aliases for overlapping playback.
Attributes:
sounds: Mapping of filename to a list of Sound aliases.
"""
def __init__(self) -> None:
super().__init__()
self.sounds: Dict[str, List[Any]] = {}
def get_sound(self, filename: str):
"""Get or load a sound; returns an alias if already loaded.
Args:
filename: Path to the sound file.
Returns:
A Sound instance (original or alias).
"""
if filename not in self.sounds:
self.sounds[filename] = [rl.load_sound(filename)]
else:
self.sounds[filename].append(rl.load_sound_alias(self.sounds[filename][0]))
return self.sounds[filename][-1]
class PhysicsService(Service):
"""Service that owns the Box2D world and physics configuration."""
def __init__(self,
gravity: b2Vec2 = b2Vec2(0.0, 10.0),
time_step: float = 1.0 / 60.0,
sub_steps: int = 6,
meters_to_pixels: float = 30.0) -> None:
super().__init__()
self.gravity = gravity
self.time_step = time_step
self.sub_steps = sub_steps
self.meters_to_pixels = meters_to_pixels
self.pixels_to_meters = 1.0 / meters_to_pixels
self.world: Optional[b2World] = None
self.debug_draw = PhysicsDebugRenderer(meters_to_pixels=meters_to_pixels)
def init(self) -> None:
"""Create the Box2D world.
Returns:
None
"""
self.world = b2World(gravity=self.gravity, doSleep=True)
self.world.contactListener = None
self.world.renderer = self.debug_draw
def update(self, delta_time: float) -> None:
"""Step the physics world.
Args:
delta_time: Seconds since the last frame (unused by fixed-step).
Returns:
None
"""
if not self.world:
return
self.world.Step(self.time_step, self.sub_steps, self.sub_steps)
def draw_debug(self) -> None:
"""Draw debug shapes for the physics world.
Returns:
None
"""
if self.world:
self.world.DrawDebugData()
def convert_to_pixels(self, meters: b2Vec2) -> b2Vec2:
"""Convert meters to pixels.
Args:
meters: Vector in meters.
Returns:
Vector in pixels.
"""
return b2Vec2(meters.x * self.meters_to_pixels, meters.y * self.meters_to_pixels)
def convert_to_meters(self, pixels) -> b2Vec2:
"""Convert pixels to meters.
Args:
pixels: Vector in pixels.
Returns:
Vector in meters.
"""
return b2Vec2(pixels.x * self.pixels_to_meters, pixels.y * self.pixels_to_meters)
def convert_length_to_pixels(self, meters: float) -> float:
"""Convert a length in meters to pixels.
Args:
meters: Length in meters.
Returns:
Length in pixels.
"""
return meters * self.meters_to_pixels
def convert_length_to_meters(self, pixels: float) -> float:
"""Convert a length in pixels to meters.
Args:
pixels: Length in pixels.
Returns:
Length in meters.
"""
return pixels * self.pixels_to_meters
def raycast(self, ignore: Optional[b2Body], start, end):
"""Raycast in pixel units.
Args:
ignore: Body to ignore during raycast.
start: Start position in pixels.
end: End position in pixels.
Returns:
RayHit if world exists, otherwise None.
"""
if not self.world:
return None
origin = self.convert_to_meters(start)
translation = self.convert_to_meters(v2(end.x - start.x, end.y - start.y))
return raycast_closest(self.world, ignore, origin, translation)
def circle_overlap(self, center, radius: float, ignore_body: Optional[b2Body] = None):
"""Overlap query for a circle in pixel units.
Args:
center: Center in pixels.
radius: Radius in pixels.
ignore_body: Optional body to ignore.
Returns:
List of bodies overlapping the circle.
"""
if not self.world:
return []
center_m = self.convert_to_meters(center)
radius_m = self.convert_length_to_meters(radius)
return circle_hit(self.world, ignore_body, center_m, radius_m)
def rectangle_overlap(self, rectangle, rotation: float = 0.0, ignore_body: Optional[b2Body] = None):
"""Overlap query for a rectangle in pixel units.
Args:
rectangle: Rectangle in pixels.
rotation: Rotation in radians.
ignore_body: Optional body to ignore.
Returns:
List of bodies overlapping the rectangle.
"""
if not self.world:
return []
size = v2(rectangle.width, rectangle.height)
center = v2(rectangle.x + size.x / 2.0, rectangle.y + size.y / 2.0)
size_m = self.convert_to_meters(size)
center_m = self.convert_to_meters(center)
return rectangle_hit(self.world, ignore_body, center_m, size_m, rotation)
@dataclass(frozen=True)
class IntPoint:
x: int
y: int
class LdtkEntity:
"""Thin wrapper around an LDtk entity instance."""
def __init__(self, entity) -> None:
self.entity = entity
def getPosition(self) -> IntPoint:
"""Get entity position in pixels.
Returns:
IntPoint for the entity position.
"""
return IntPoint(self.entity.px[0], self.entity.px[1])
def getSize(self) -> IntPoint:
"""Get entity size in pixels.
Returns:
IntPoint for the entity size.
"""
return IntPoint(self.entity.width, self.entity.height)
def getField(self, name: str) -> Optional[Any]:
"""Get a field value by name.
Args:
name: Field identifier.
Returns:
Field value, converted for point fields when possible.
"""
for field in self.entity.field_instances:
if field.identifier == name:
value = field.value
if isinstance(value, dict) and "cx" in value and "cy" in value:
return IntPoint(int(value["cx"]), int(value["cy"]))
return value
return None
@dataclass
class LayerRenderer:
renderer: rl.RenderTexture
layer_iid: str
visible: bool = True
class LevelService(Service):
"""Service for loading and drawing LDtk levels and collisions.
Attributes:
project: Parsed LDtk project.
level: Active Level instance.
renderers: Render textures per layer.
layer_bodies: Physics bodies used for collision.
physics: PhysicsService reference.
"""
def __init__(self,
project_file: str,
level_name: str,
collision_names: List[str],
scale: float = 1.0) -> None:
super().__init__()
self.project_file = project_file
self.level_name = level_name
self.collision_names = collision_names
self.scale = scale
self.project: Optional[LdtkJSON] = None
self.level: Optional[Level] = None
self.renderers: List[LayerRenderer] = []
self.layer_bodies: List[b2Body] = []
self.physics: Optional[PhysicsService] = None
self.layer_defs_by_uid: Dict[int, Any] = {}
def init(self) -> None:
"""Load the LDtk project, build renderers and collision bodies.
Returns:
None
"""
if not rl.file_exists(self.project_file):
print(f"LDtk file not found: {self.project_file}")
raise RuntimeError("LDtk file not found")
with open(self.project_file, "r", encoding="utf-8") as handle:
project_data = json.load(handle)
self.project = LdtkJSON.from_dict(project_data)
level = None
for candidate in self.project.levels:
if candidate.identifier == self.level_name:
level = candidate
break
if level is None:
print(f"LDtk level not found: {self.level_name}")
raise RuntimeError("LDtk level not found")
if level.layer_instances is None and level.external_rel_path:
external_path = self._resolve_external_level_path(level.external_rel_path)
with open(external_path, "r", encoding="utf-8") as handle:
external_data = json.load(handle)
level = Level.from_dict(external_data)
self.level = level
self.layer_defs_by_uid = {layer.uid: layer for layer in self.project.defs.layers}
self.physics = self.scene.get_service(PhysicsService) if self.scene else None
if not self.physics:
print("PhysicsService required for LevelService")
raise RuntimeError("PhysicsService required")
texture_service = self.scene.get_service(TextureService)
for layer in self.level.layer_instances or []:
if layer.tileset_rel_path:
tileset_path = self._resolve_tileset_path(layer.tileset_rel_path)
texture = texture_service.get_texture(tileset_path)
renderer = rl.load_render_texture(self.level.px_wid, self.level.px_hei)
self._render_layer_tiles(layer, texture, renderer)
self.renderers.append(LayerRenderer(renderer=renderer, layer_iid=layer.iid, visible=layer.visible))
if layer.type == "IntGrid" and self.collision_names:
self._build_collision_for_layer(layer)
def _resolve_tileset_path(self, rel_path: str) -> str:
"""Resolve a tileset path relative to the project file.
Args:
rel_path: Relative tileset path from the LDtk project.
Returns:
Absolute or normalized tileset path.
"""
import os
directory = os.path.dirname(self.project_file)
return os.path.join(directory, rel_path).replace("\\", "/")
def _resolve_external_level_path(self, rel_path: str) -> str:
"""Resolve an external level path relative to the project file.
Args:
rel_path: Relative level path from the LDtk project.
Returns:
Absolute or normalized level path.
"""
import os
directory = os.path.dirname(self.project_file)
return os.path.join(directory, rel_path).replace("\\", "/")
def _render_layer_tiles(self, layer: LayerInstance, texture: rl.Texture2D, renderer: rl.RenderTexture) -> None:
"""Render the tiles for a layer to a render texture.
Args:
layer: Layer instance to render.
texture: Tileset texture.
renderer: Render texture target.
Returns:
None
"""
rl.begin_texture_mode(renderer)
rl.clear_background(rl.Color(0, 0, 0, 0))
tile_size = layer.grid_size
tiles = list(layer.grid_tiles) + list(layer.auto_layer_tiles)
for tile in tiles:
src_x, src_y = tile.src[0], tile.src[1]
flip_x = (tile.f & 1) != 0
flip_y = (tile.f & 2) != 0
src = rl.Rectangle(float(src_x), float(src_y),
float(tile_size) * (-1.0 if flip_x else 1.0),
float(tile_size) * (-1.0 if flip_y else 1.0))
dest = v2(float(tile.px[0] + layer.px_total_offset_x), float(tile.px[1] + layer.px_total_offset_y))
rl.draw_texture_rec(texture, src, dest, rl.WHITE)
rl.end_texture_mode()
def _intgrid_value_name(self, layer: LayerInstance, value: int) -> Optional[str]:
"""Map an IntGrid value to its identifier string.
Args:
layer: Layer instance with IntGrid definitions.
value: Raw IntGrid value.
Returns:
Identifier string or None if empty/unknown.
"""
if value == 0:
return None
layer_def = self.layer_defs_by_uid.get(layer.layer_def_uid)
if not layer_def:
return None
for def_value in layer_def.int_grid_values:
if def_value.value == value:
return def_value.identifier
return None
def _build_collision_for_layer(self, layer: LayerInstance) -> None:
"""Create boundary colliders for a collision layer.
Args:
layer: Layer instance to build colliders for.
Returns:
None
"""
if not self.physics or not self.physics.world:
return
world = self.physics.world
body = world.CreateStaticBody(position=(0, 0))
grid_w = layer.c_wid
grid_h = layer.c_hei
cell_size = float(layer.grid_size) * self.scale
# Build boundary edges into chain shapes to avoid internal collisions.
def is_solid(cx: int, cy: int) -> bool:
if cx < 0 or cy < 0 or cx >= grid_w or cy >= grid_h:
return False
idx = cy * grid_w + cx
value = layer.int_grid_csv[idx] if idx < len(layer.int_grid_csv) else 0
name = self._intgrid_value_name(layer, value)
return bool(name and name in self.collision_names)
def make_edge(a, b):
return (a, b) if a <= b else (b, a)
edges = set()
for y in range(grid_h):
for x in range(grid_w):
if not is_solid(x, y):
continue
if not is_solid(x, y - 1):
edges.add(make_edge((x, y), (x + 1, y)))
if not is_solid(x, y + 1):
edges.add(make_edge((x, y + 1), (x + 1, y + 1)))
if not is_solid(x - 1, y):
edges.add(make_edge((x, y), (x, y + 1)))
if not is_solid(x + 1, y):
edges.add(make_edge((x + 1, y), (x + 1, y + 1)))
adj: Dict[tuple, List[tuple]] = {}
for a, b in edges:
adj.setdefault(a, []).append(b)
adj.setdefault(b, []).append(a)
def erase_edge(a, b):
edges.discard(make_edge(a, b))
loops: List[List[tuple]] = []
while edges:
start_a, start_b = next(iter(edges))
start = start_a
cur = start_b
prev = start
poly = [start, cur]
erase_edge(start, cur)
while cur != start:
next_pt = None
for cand in adj.get(cur, []):
if cand == prev:
continue
if make_edge(cur, cand) in edges:
next_pt = cand
break
if next_pt is None:
break
prev, cur = cur, next_pt
poly.append(cur)
erase_edge(prev, cur)
if len(poly) > 100000:
break
if poly and poly[0] == poly[-1]:
poly.pop()
if len(poly) >= 3:
loops.append(poly)
for loop in loops:
verts = []
for cx, cy in loop:
x_px = cx * cell_size
y_px = cy * cell_size
verts.append(self.physics.convert_to_meters(v2(x_px, y_px)))
count = len(verts)
if count < 2:
continue
for i in range(count):
v1 = verts[i]
v2p = verts[(i + 1) % count]
edge = b2EdgeShape(vertices=[(float(v1.x), float(v1.y)), (float(v2p.x), float(v2p.y))])
body.CreateFixture(shape=edge, friction=0.1, restitution=0.1)
self.layer_bodies.append(body)
def draw(self) -> None:
"""Draw all visible layer renderers in reverse order.
Returns:
None
"""
for renderer in reversed(self.renderers):
if not renderer.visible:
continue
texture = renderer.renderer.texture
src = rl.Rectangle(0.0, 0.0, float(texture.width), -float(texture.height))
dest = rl.Rectangle(0.0, 0.0, float(texture.width) * self.scale, float(texture.height) * self.scale)
rl.draw_texture_pro(texture, src, dest, v2(0.0, 0.0), 0.0, rl.WHITE)
def draw_layer(self, layer_id_or_name: str) -> None:
"""Draw a specific layer by IID or identifier.
Args:
layer_id_or_name: Layer IID or identifier.
Returns:
None
"""
if not self.level:
return
layer = None
for layer_inst in self.level.layer_instances or []:
if layer_inst.iid == layer_id_or_name or layer_inst.identifier == layer_id_or_name:
layer = layer_inst
break
if not layer:
return
for renderer in self.renderers:
if renderer.layer_iid == layer.iid:
texture = renderer.renderer.texture
src = rl.Rectangle(0.0, 0.0, float(texture.width), -float(texture.height))
dest = rl.Rectangle(0.0, 0.0, float(texture.width) * self.scale, float(texture.height) * self.scale)
rl.draw_texture_pro(texture, src, dest, v2(0.0, 0.0), 0.0, rl.WHITE)
return
def set_layer_visibility(self, layer_id_or_name: str, visible: bool) -> None:
"""Set a layer's visibility by IID or identifier.
Args:
layer_id_or_name: Layer IID or identifier.
visible: True to show the layer, False to hide it.
Returns:
None
"""
if not self.level:
return
for layer_inst in self.level.layer_instances or []:
if layer_inst.iid == layer_id_or_name or layer_inst.identifier == layer_id_or_name:
for renderer in self.renderers:
if renderer.layer_iid == layer_inst.iid:
renderer.visible = visible
return
def get_layer_by_name(self, name: str) -> Optional[LayerInstance]:
"""Get a layer instance by name.
Args:
name: Layer identifier.
Returns:
The LayerInstance or None.
"""
if not self.level:
return None
for layer in self.level.layer_instances or []:
if layer.identifier == name:
return layer
return None
def get_entities(self) -> List[LdtkEntity]:
"""Get all entities across all layers.
Returns:
List of LdtkEntity wrappers.
"""
if not self.level:
print("LDtk project not loaded.")
return []
entities: List[LdtkEntity] = []
for layer in self.level.layer_instances or []:
if layer.type != "Entities":
continue
for entity in layer.entity_instances:
entities.append(LdtkEntity(entity))
return entities
def get_entities_by_name(self, name: str) -> List[LdtkEntity]:
"""Get all entities by name across all layers.
Args:
name: Entity identifier.
Returns:
List of matching entities.
"""
return [entity for entity in self.get_entities() if entity.entity.identifier == name]
def get_entities_by_tag(self, tag: str) -> List[LdtkEntity]:
"""Get all entities by tag across all layers.
Args:
tag: Tag string.
Returns:
List of matching entities.
"""
return [entity for entity in self.get_entities() if tag in entity.entity.tags]
def get_entity_by_name(self, name: str) -> Optional[LdtkEntity]:
"""Get the first entity by name.
Args:
name: Entity identifier.
Returns:
The first matching entity, or None.
"""
entities = self.get_entities_by_name(name)
return entities[0] if entities else None
def get_entity_by_tag(self, tag: str) -> Optional[LdtkEntity]:
"""Get the first entity by tag.
Args:
tag: Tag string.
Returns:
The first matching entity, or None.
"""
entities = self.get_entities_by_tag(tag)
return entities[0] if entities else None
def convert_to_pixels(self, point: IntPoint) -> Any:
"""Convert grid point to pixels.
Args:
point: Grid point.
Returns:
Vector2 in pixels.
"""
return v2(point.x * self.scale, point.y * self.scale)
def convert_cells_to_pixels(self, cell_point: IntPoint, layer: LayerInstance):
"""Convert cell coordinates to pixels.
Args:
cell_point: Cell coordinates.
layer: Layer instance for cell size.
Returns:
Vector2 in pixels.
"""
cell_size = float(layer.grid_size)
return v2(cell_point.x * cell_size * self.scale, cell_point.y * cell_size * self.scale)
def convert_to_meters(self, point: IntPoint):
"""Convert grid point to meters.
Args:
point: Grid point.
Returns:
b2Vec2 in meters.
"""
if not self.physics:
return b2Vec2(0.0, 0.0)
return self.physics.convert_to_meters(self.convert_to_pixels(point))
def convert_to_grid(self, pixels) -> IntPoint:
"""Convert pixels to grid coordinates.
Args:
pixels: Vector2 in pixels.
Returns:
IntPoint grid coordinate.
"""
return IntPoint(int(pixels.x / self.scale), int(pixels.y / self.scale))
def convert_to_grid_meters(self, meters) -> IntPoint:
"""Convert meters to grid coordinates.
Args:
meters: b2Vec2 in meters.
Returns:
IntPoint grid coordinate.
"""
if not self.physics:
return IntPoint(0, 0)
pixels = self.physics.convert_to_pixels(meters)
return IntPoint(int(pixels.x / self.scale), int(pixels.y / self.scale))
def get_size(self):
"""Get level size in pixels.
Returns:
Vector2 containing level width and height in pixels.
"""
if not self.level:
return v2(0.0, 0.0)
return v2(float(self.level.px_wid) * self.scale, float(self.level.px_hei) * self.scale)