first commit
This commit is contained in:
104
clip/vectorize.py
Normal file
104
clip/vectorize.py
Normal file
@@ -0,0 +1,104 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
from typing import Tuple
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from PIL import Image
|
||||
|
||||
from common.image_io import fetch_url_bytes, bytes_to_pil, ImageLoadError
|
||||
|
||||
try:
|
||||
import open_clip
|
||||
except Exception:
|
||||
open_clip = None
|
||||
|
||||
try:
|
||||
from transformers import CLIPProcessor, CLIPModel
|
||||
except Exception:
|
||||
CLIPModel = None
|
||||
CLIPProcessor = None
|
||||
|
||||
|
||||
def load_openclip(model_name: str = "ViT-B-32", pretrained: str = "openai") -> Tuple:
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
if open_clip is None:
|
||||
raise RuntimeError("open_clip is not installed")
|
||||
model, _, preprocess = open_clip.create_model_and_transforms(model_name, pretrained=pretrained)
|
||||
model = model.to(device).eval()
|
||||
return model, preprocess, device
|
||||
|
||||
|
||||
def embed_openclip(model, preprocess, device, pil_image: Image.Image) -> np.ndarray:
|
||||
image_input = preprocess(pil_image).unsqueeze(0).to(device)
|
||||
with torch.no_grad():
|
||||
image_features = model.encode_image(image_input)
|
||||
image_features = image_features / image_features.norm(dim=-1, keepdim=True)
|
||||
return image_features.cpu().numpy()[0]
|
||||
|
||||
|
||||
def load_hf_clip(model_name: str = "openai/clip-vit-base-patch32") -> Tuple:
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
if CLIPModel is None or CLIPProcessor is None:
|
||||
raise RuntimeError("transformers (CLIP) is not installed")
|
||||
model = CLIPModel.from_pretrained(model_name).to(device).eval()
|
||||
processor = CLIPProcessor.from_pretrained(model_name)
|
||||
return model, processor, device
|
||||
|
||||
|
||||
def embed_hf_clip(model, processor, device, pil_image: Image.Image) -> np.ndarray:
|
||||
inputs = processor(images=pil_image, return_tensors="pt")
|
||||
inputs = {k: v.to(device) for k, v in inputs.items()}
|
||||
with torch.no_grad():
|
||||
feats = model.get_image_features(**inputs)
|
||||
feats = feats / feats.norm(dim=-1, keepdim=True)
|
||||
return feats.cpu().numpy()[0]
|
||||
|
||||
|
||||
def load_image(path_or_url: str) -> Image.Image:
|
||||
if path_or_url.startswith("http://") or path_or_url.startswith("https://"):
|
||||
data = fetch_url_bytes(path_or_url)
|
||||
return bytes_to_pil(data)
|
||||
else:
|
||||
return Image.open(path_or_url).convert("RGB")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Vectorize an image using CLIP (open_clip or HuggingFace)")
|
||||
parser.add_argument("input", help="Path to image file or URL")
|
||||
parser.add_argument("--backend", choices=("openclip", "hf"), default="openclip")
|
||||
parser.add_argument("--model", default=None, help="Model name (backend-specific)")
|
||||
parser.add_argument("--pretrained", default="openai", help="open_clip pretrained source (openclip backend)")
|
||||
parser.add_argument("--out", default=None, help="Output .npy path (defaults to stdout)")
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
img = load_image(args.input)
|
||||
except ImageLoadError as e:
|
||||
raise SystemExit(f"Failed to load image: {e}")
|
||||
|
||||
if args.backend == "openclip":
|
||||
model_name = args.model or os.getenv("MODEL_NAME", "ViT-B-32")
|
||||
pretrained = args.pretrained
|
||||
model, preprocess, device = load_openclip(model_name, pretrained=pretrained)
|
||||
vec = embed_openclip(model, preprocess, device, img)
|
||||
else:
|
||||
model_name = args.model or "openai/clip-vit-base-patch32"
|
||||
model, processor, device = load_hf_clip(model_name)
|
||||
vec = embed_hf_clip(model, processor, device, img)
|
||||
|
||||
vec = np.asarray(vec, dtype=np.float32)
|
||||
|
||||
if args.out:
|
||||
np.save(args.out, vec)
|
||||
print(f"Saved vector shape={vec.shape} to {args.out}")
|
||||
else:
|
||||
# Print a short summary and the vector length. Full vector to stdout can be large.
|
||||
print(f"vector_shape={vec.shape}")
|
||||
print(np.array2string(vec, precision=6, separator=", "))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user