from DepthEstimator import DepthEstimator import numpy as np from PIL import Image import os from GenerateCaptions import generate_caption import re from config import LOGS_DIR from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection import torch from PIL import Image, ImageDraw, ImageFont import spacy import gc class SoundMapper: def __init__(self): self.depth_estimator = DepthEstimator() # List of depth maps in dict["predicted_depth" ,"depth"] in (tensor, PIL.Image) format self.device = "cuda" # self.map_list = self.depth_estimator.estimate_depth(self.depth_estimator.image_dir) self.map_list = None self.image_dir = self.depth_estimator.image_dir # self.nlp = spacy.load("en_core_web_sm") self.nlp = None self.dino = None self.dino_processor = None # self.dino = AutoModelForZeroShotObjectDetection.from_pretrained("IDEA-Research/grounding-dino-tiny").to(self.device) # self.dino_processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-tiny") def _load_nlp(self): if self.nlp is None: self.nlp = spacy.load("en_core_web_sm") return self.nlp def _load_depth_maps(self): if self.map_list is None: self.map_list = self.depth_estimator.estimate_depth(self.depth_estimator.image_dir) return self.map_list def process_depth_maps(self) -> list: depth_maps = self._load_depth_maps() processed_maps = [] for item in depth_maps: depth_map = item["depth"] depth_array = np.array(depth_map) normalization = depth_array / 255.0 processed_maps.append({ "original": depth_map, "normalization": normalization }) return processed_maps # def create_depth_zone(self, processed_maps : list, num_zones = 3): # zones_data = [] # for depth_data in processed_maps: # normalized = depth_data["normalization"] # thresholds = np.linspace(0, 1, num_zones+1) # zones = [] # for i in range(num_zones): # zone_mask = (normalized >= thresholds[i]) & (normalized < thresholds[i+1]) # zone_percentage = zone_mask.sum() / zone_mask.size # zones.append({ # "range": (thresholds[i], thresholds[i+1]), # "percentage": zone_percentage, # "mask": zone_mask # }) # zones_data.append(zones) # return zones_data def detect_sound_sources(self, caption_text: str) -> dict: """ Extract nouns and their sound descriptions from caption text. Returns a dictionary mapping nouns to their descriptions. """ sound_sources = {} nlp = self._load_nlp() print(f"\n[DEBUG] Beginning sound source detection") print(f"Raw caption text length: {len(caption_text)}") print(f"First 100 chars: {caption_text[:100]}...") # Split the caption by newlines to separate entries lines = caption_text.strip().split('\n') print(f"Found {len(lines)} lines after splitting") for i, line in enumerate(lines): # Skip empty lines if not line.strip(): continue print(f"Processing line {i}: {line[:50]}{'...' if len(line) > 50 else ''}") # Check if line matches the expected format (Noun: description) if ':' in line: parts = line.split(':', 1) # Split only on the first colon # Clean up the noun part - remove numbers and leading/trailing whitespace noun_part = parts[0].strip().lower() # Remove list numbering (e.g., "1. ", "2. ", etc.) noun_part = re.sub(r'^\d+\.\s*', '', noun_part) description = parts[1].strip() # Clean any markdown formatting noun = re.sub(r'[*()]', '', noun_part).strip() description = re.sub(r'[*()]', '', description).strip() # Separate the description at em dash if present if ' — ' in description: description = description.split(' — ', 1)[0].strip() elif ' - ' in description: description = description.split(' - ', 1)[0].strip() print(f" - Found potential noun: '{noun}' with description: '{description[:30]}...'") # Skip if noun contains invalid characters or is too short if '##' not in noun and len(noun) > 1 and noun[0].isalpha(): sound_sources[noun] = description print(f" √ Added to sound sources") else: print(f" × Skipped (invalid format)") # If no structured format found, try to extract nouns from the text if not sound_sources: print("No structured format found, falling back to noun extraction") all_nouns = [] doc = nlp(caption_text) for token in doc: if token.pos_ == "NOUN" and len(token.text) > 1: if token.text[0].isalpha(): all_nouns.append(token.text.lower()) print(f" - Extracted noun: '{token.text.lower()}'") for noun in all_nouns: sound_sources[noun] = "" # Empty description print(f"[DEBUG] Final detected sound sources: {list(sound_sources.keys())}") return sound_sources def map_bbox_to_depth_zone(self, bbox, depth_map, num_zones=3): x1, y1, x2, y2 = [int(coord) for coord in bbox] height, width = depth_map.shape x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(width, x2), min(height, y2) depth_roi = depth_map[y1:y2, x1:x2] if depth_roi.size == 0: return num_zones - 1 mean_depth = np.mean(depth_roi) thresholds = self.create_histogram_depth_zones(depth_map, num_zones) for i in range(num_zones): if thresholds[i] <= mean_depth < thresholds[i+1]: return i return num_zones - 1 def detect_objects(self, nouns : list, image: Image): filtered_nouns = [] for noun in nouns: if '##' not in noun and len(noun) > 1 and noun[0].isalpha(): filtered_nouns.append(noun) print(f"Detecting objects for nouns: {filtered_nouns}") if self.dino is None: self.dino = AutoModelForZeroShotObjectDetection.from_pretrained("IDEA-Research/grounding-dino-base").to(self.device) self.dino_processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-base") else: self.dino = self.dino.to(self.device) text_prompt = " . ".join(filtered_nouns) inputs = self.dino_processor(images=image, text=text_prompt, return_tensors="pt").to(self.device) with torch.no_grad(): outputs = self.dino(**inputs) results = self.dino_processor.post_process_grounded_object_detection( outputs, inputs.input_ids, box_threshold=0.25, text_threshold=0.25, target_sizes=[image.size[::-1]] ) result = results[0] labels = result["labels"] bboxes = result["boxes"] clean_labels = [] for label in labels: clean_label = re.sub(r'##\w+', '', label) clean_label = self._split_combined_words(clean_label, filtered_nouns) clean_labels.append(clean_label) self.dino = self.dino.to("cpu") torch.cuda.empty_cache() del inputs, outputs, results print(f"Detected objects: {clean_labels}") return (clean_labels, bboxes) def _split_combined_words(self, text, nouns=None): nlp = self._load_nlp() if nouns is None: known_words = set() doc = nlp(text) for token in doc: if token.pos_ == "NOUN" and len(token.text) > 1: known_words.add(token.text.lower()) else: known_words = set(nouns) result = [] for word in text.split(): if word in known_words: result.append(word) continue found = False for known in known_words: if known in word and len(known) > 2: result.append(known) found = True if not found: result.append(word) return " ".join(result) def process_dino_labels(self, labels): processed_labels = [] nlp = self._load_nlp() for label in labels: if label.startswith('##'): continue label = re.sub(r'[*()]', '', label).strip() parts = label.split() for part in parts: if part.startswith('##'): continue doc = nlp(part) for token in doc: if token.pos_ == "NOUN" and len(token.text) > 1: processed_labels.append(token.text.lower()) unique_labels = [] for label in processed_labels: if label not in unique_labels: unique_labels.append(label) return unique_labels def create_histogram_depth_zones(self, depth_map, num_zones = 3): # using 50 bins because it is faster hist, bin_edge = np.histogram(depth_map.flatten(), bins=50, range=(0, 1)) cumulative = np.cumsum(hist) / np.sum(hist) thresholds = [0.0] for i in range(1, num_zones): target = i / num_zones idx = np.argmin(np.abs(cumulative - target)) thresholds.append(bin_edge[idx + 1]) thresholds.append(1.0) return thresholds def analyze_object_depths(self, image_path, depth_map, lat, lon, caption_data=None, all_objects=False): image = Image.open(image_path) if caption_data is None: caption = generate_caption(lat, lon) if not caption: print(f"Failed to generate caption for {image_path}") return [] caption_text = caption.get("sound_description", "") else: caption_text = caption_data.get("sound_description", "") # Debug: Print the raw caption text print(f"\n[DEBUG] Raw caption text for {os.path.basename(image_path)}:") print(caption_text) print("-" * 50) if not caption_text: print(f"No caption text available for {image_path}") return [] # Extract nouns and their sound descriptions sound_sources = self.detect_sound_sources(caption_text) # Debug: Print the extracted sound sources print(f"[DEBUG] Extracted sound sources:") for noun, desc in sound_sources.items(): print(f" - {noun}: {desc}") print("-" * 50) if not sound_sources: print(f"No sound sources detected in caption for {image_path}") return [] # Get list of nouns only for object detection nouns = list(sound_sources.keys()) # Debug: Print the list of nouns being used for detection print(f"[DEBUG] Nouns for object detection: {nouns}") print("-" * 50) labels, bboxes = self.detect_objects(nouns, image) if len(labels) == 0 or len(bboxes) == 0: print(f"No objects detected in {image_path}") return [] object_data = [] known_objects = set(nouns) if nouns else set() for i, (label, bbox) in enumerate(zip(labels, bboxes)): if '##' in label: continue x1, y1, x2, y2 = [int(coord) for coord in bbox] height, width = depth_map.shape x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(width, x2), min(height, y2) depth_roi = depth_map[y1:y2, x1:x2] if depth_roi.size == 0: continue mean_depth = np.mean(depth_roi) matched_noun = None matched_desc = None for word in label.split(): word = word.lower() if word in sound_sources: matched_noun = word matched_desc = sound_sources[word] break if matched_noun is None: for noun in sound_sources: if noun in label.lower(): matched_noun = noun matched_desc = sound_sources[noun] break if matched_noun is None: for word in label.split(): if len(word) > 1 and word[0].isalpha() and '##' not in word: matched_noun = word.lower() matched_desc = "" # No description available break if matched_noun: thresholds = self.create_histogram_depth_zones(depth_map, num_zones=3) zone = 0 # The default is 0 which is the closest zone for i in range(3): if thresholds[i] <= mean_depth < thresholds[i+1]: zone = i break object_data.append({ "original_label": matched_noun, "bbox": bbox.tolist(), "depth_zone": zone, "zone_description": ["near", "medium", "far"][zone], "mean_depth": mean_depth, "weight": 1.0 - mean_depth, "sound_description": matched_desc }) if all_objects: object_data.sort(key=lambda x: x["mean_depth"]) return object_data else: if not object_data: return [] closest_object = min(object_data, key=lambda x: x["mean_depth"]) return [closest_object] def cleanup(self): if hasattr(self, 'depth_estimator') and self.depth_estimator is not None: del self.depth_estimator self.depth_estimator = None if self.map_list is not None: del self.map_list self.map_list = None if self.dino is not None: self.dino = self.dino.to("cpu") del self.dino self.dino = None del self.dino_processor self.dino_processor = None if self.nlp is not None: del self.nlp self.nlp = None torch.cuda.empty_cache() gc.collect() def test_object_depth_analysis(self): """ Test the object depth analysis on all images in the directory. """ # Process depth maps first processed_maps = self.process_depth_maps() # Get list of original image paths image_dir = self.depth_estimator.image_dir image_paths = [os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.endswith(".jpg")] results = [] # For each image and its corresponding depth map for i, (image_path, processed_map) in enumerate(zip(image_paths, processed_maps)): # Extract the normalized depth map depth_map = processed_map["normalization"] # Analyze objects and their depths object_depths = self.analyze_object_depths(image_path, depth_map) # Store results results.append({ "image_path": image_path, "object_depths": object_depths }) # Print some information for debugging print(f"Analyzed {image_path}:") for obj in object_depths: print(f" - {obj['original_label']} (Zone: {obj['zone_description']})") return results