Сделал тут по другому нейронку из туториала по торчу, решил выложить сюда готовый код:
!unzip /content/PennFudanPed.zip
import os
import numpy as np
import torch
from PIL import Image
class PennFudanDataset(torch.utils.data.Dataset):
def __init__(self, root, transforms):
self.root = root
self.transforms = transforms
# load all image files, sorting them to
# ensure that they are aligned
self.imgs = list(sorted(os.listdir(os.path.join(root, "PNGImages"))))
self.masks = list(sorted(os.listdir(os.path.join(root, "PedMasks"))))
def __getitem__(self, idx):
# load images and masks
img_path = os.path.join(self.root, "PNGImages", self.imgs[idx])
mask_path = os.path.join(self.root, "PedMasks", self.masks[idx])
img = Image.open(img_path).convert("RGB")
# note that we haven't converted the mask to RGB,
# because each color corresponds to a different instance
# with 0 being background
mask = Image.open(mask_path)
# convert the PIL Image into a numpy array
mask = np.array(mask)
# instances are encoded as different colors
obj_ids = np.unique(mask)
# first id is the background, so remove it
obj_ids = obj_ids[1:]
# split the color-encoded mask into a set
# of binary masks
masks = mask == obj_ids[:, None, None]
# get bounding box coordinates for each mask
num_objs = len(obj_ids)
boxes = []
for i in range(num_objs):
pos = np.where(masks[i])
xmin = np.min(pos[1])
xmax = np.max(pos[1])
ymin = np.min(pos[0])
ymax = np.max(pos[0])
boxes.append([xmin, ymin, xmax, ymax])
# convert everything into a torch.Tensor
boxes = torch.as_tensor(boxes, dtype=torch.float32)
# there is only one class
labels = torch.ones((num_objs,), dtype=torch.int64)
masks = torch.as_tensor(masks, dtype=torch.uint8)
image_id = torch.tensor([idx])
area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
# suppose all instances are not crowd
iscrowd = torch.zeros((num_objs,), dtype=torch.int64)
target = {}
target["boxes"] = boxes
target["labels"] = labels
target["masks"] = masks
target["image_id"] = image_id
target["area"] = area
target["iscrowd"] = iscrowd
if self.transforms is not None:
img, target = self.transforms(img, target)
return img, target
def __len__(self):
return len(self.imgs)
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
# load a model pre-trained on COCO
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")
# replace the classifier with a new one, that has
# num_classes which is user-defined
num_classes = 2 # 1 class (person) + background
# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features
# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
# load a pre-trained model for classification and return
# only the features
backbone = torchvision.models.mobilenet_v2(weights="DEFAULT").features
# FasterRCNN needs to know the number of
# output channels in a backbone. For mobilenet_v2, it's 1280
# so we need to add it here
backbone.out_channels = 1280
# let's make the RPN generate 5 x 3 anchors per spatial
# location, with 5 different sizes and 3 different aspect
# ratios. We have a Tuple[Tuple[int]] because each feature
# map could potentially have different sizes and
# aspect ratios
anchor_generator = AnchorGenerator(sizes=((32, 64, 128, 256, 512),),
aspect_ratios=((0.5, 1.0, 2.0),))
# let's define what are the feature maps that we will
# use to perform the region of interest cropping, as well as
# the size of the crop after rescaling.
# if your backbone returns a Tensor, featmap_names is expected to
# be [0]. More generally, the backbone should return an
# OrderedDict[Tensor], and in featmap_names you can choose which
# feature maps to use.
roi_pooler = torchvision.ops.MultiScaleRoIAlign(featmap_names=['0'],
output_size=7,
sampling_ratio=2)
# put the pieces together inside a FasterRCNN model
model = FasterRCNN(backbone,
num_classes=2,
rpn_anchor_generator=anchor_generator,
box_roi_pool=roi_pooler)
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
def get_model_instance_segmentation(num_classes):
# load an instance segmentation model pre-trained on COCO
model = torchvision.models.detection.maskrcnn_resnet50_fpn(weights="DEFAULT")
# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features
# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
# now get the number of input features for the mask classifier
in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
hidden_layer = 256
# and replace the mask predictor with a new one
model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask,
hidden_layer,
num_classes)
return model
from torch import Tensor, nn
from torchvision import ops
from torchvision.transforms import functional as F, transforms as T
class Compose():
def __init__(self, transforms):
self.transforms = transforms
def __call__(self, image, target):
for t in self.transforms:
image, target = t(image, target)
return image, target
from typing import *
class PILToTensor(nn.Module):
def forward(self, image, target = None):
image = F.pil_to_tensor(image)
return image, target
class ConvertImageDtype(nn.Module):
def __init__(self, dtype):
super().__init__()
self.dtype = dtype
def forward(self, image, target):
image = F.convert_image_dtype(image,dtype = self.dtype)
return image, target
class RandomHorizontalFlip(T.RandomHorizontalFlip):
def forward(self, image, target = None):
if torch.rand(1) < self.p:
image = F.hflip(image)
if target is not None:
_, _, width = F.get_dimensions(image)
target['boxes'][:, [2, 0]] = width - target['boxes'][:, [0, 2]]
if 'mask' in target:
target['masks'] = target['masks'].flip(-1)
return image, target
def get_transforms(train: bool = False):
transforms = [
PILToTensor(),
ConvertImageDtype(torch.float)
]
if train:
transforms.append(RandomHorizontalFlip(0.5))
return Compose(transforms)
def collate_fn(batch):
return tuple(zip(*batch))
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")
dataset = PennFudanDataset('PennFudanPed', get_transforms(train=True))
data_loader = torch.utils.data.DataLoader(
dataset, batch_size=2, shuffle=True, num_workers=2,
collate_fn=collate_fn)
# For Training
images, targets = next(iter(data_loader))
images = list(image for image in images)
targets = [{k: v for k, v in t.items()} for t in targets]
output = model(images,targets)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
num_classes = 2
dataset = PennFudanDataset('PennFudanPed', get_transforms(train=True))
dataset_test = PennFudanDataset('PennFudanPed', get_transforms(train=False))
indices = torch.randperm(len(dataset)).tolist()
dataset = torch.utils.data.Subset(dataset, indices[:-50])
dataset_test = torch.utils.data.Subset(dataset_test, indices[-50:])
# define training and validation data loaders
data_loader = torch.utils.data.DataLoader(
dataset, batch_size=2, shuffle=True, num_workers=2,
collate_fn=collate_fn)
data_loader_test = torch.utils.data.DataLoader(
dataset_test, batch_size=1, shuffle=False, num_workers=2,
collate_fn=collate_fn)
# get the model using our helper function
model = get_model_instance_segmentation(num_classes)
# move model to the right device
model.to(device)
# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,
momentum=0.9, weight_decay=0.0005)
# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
step_size=3,
gamma=0.1)
from tqdm import tqdm
def train_one_epoch(model, dataloader, optimizer, device):
model.train()
epoch_loss = 0.0
for images, targets in tqdm(dataloader):
images = [image.to(device) for image in images]
targets = [{k: v.to(device) for k, v in target.items()} for target in targets]
loss_dict = model(images, targets)
loss = sum(l for l in loss_dict.values())
optimizer.zero_grad()
loss.backward()
optimizer.step()
epoch_loss += loss.item()
return epoch_loss / len(dataloader)
@torch.no_grad()
def evaluate(model, dataloader, device):
#model.eval()
epoch_loss = 0.0
for images, targets in tqdm(dataloader):
images = [image.to(device) for image in images]
targets = [{k: v.to(device) for k, v in target.items()} for target in targets]
loss_dict = model(images, targets)
loss = sum(l for l in loss_dict.values())
epoch_loss += loss.item()
return epoch_loss / len(dataloader)
epochs = 10
for epoch in range(1, epochs + 1):
train_loss = train_one_epoch(model, data_loader, optimizer, device)
val_loss = evaluate(model, data_loader_test, device)
lr_scheduler.step()
print(f'Epoch {epoch}\nTrain Loss = {train_loss:.2f}\nValidation Loss = {val_loss:.2f}')
img, _ = dataset_test[0]
model.eval()
with torch.no_grad():
prediction = model([img.to(device)])
from PIL import ImageDraw
img_copy = Image.fromarray(prediction[0]['masks'][0, 0].mul(255).byte().cpu().numpy())
state_dict = prediction[0]
draw = ImageDraw.Draw(img_copy)
for i in range(len(state_dict['boxes'])):
draw.rectangle(state_dict['boxes'][i].cpu().numpy(), outline = "#FF0000")
img_copy
with open('segmentation_model.pt', 'wb') as f:
torch.save(model, f)
device = torch.device('cuda:0')
model = get_model_instance_segmentation(2)
model.to(device)
model.load_state_dict(torch.load('./segmentation_model (1).pt'))
model.eval()
from PIL import ImageDraw
import cv2
def convert_to_pil(img: np.ndarray) -> Image:
return Image.fromarray(img)
def predict(img: Image):
img = F.pil_to_tensor(img)
img = F.convert_image_dtype(img, dtype=torch.float)
img = img.to(device)
with torch.no_grad():
prediction = model([img])
return prediction
def draw_boxes(img: Image, prediction):
state_dict = prediction[0]
draw = ImageDraw.Draw(img)
for i in range(len(state_dict['labels'])):
draw.rectangle(state_dict['boxes'][i].cpu().numpy(), outline="#FF0000")
return np.array(img)
from tqdm import tqdm
import math
def process_video(model, video_path, frame_rate = 30, max_frame = None):
capture = cv2.VideoCapture(video_path)
fps = capture.get(cv2.CAP_PROP_FPS)
print(f'Frame Rate: {fps}')
lenght = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
multipier = int(math.ceil(fps / frame_rate)) if fps > frame_rate else 1
ret = True
box_frames = []
progress = iter(tqdm(range(lenght)))
while ret:
ret, frame = capture.read()
if (n:= next(progress)) % multipier != 0:
if max_frame is not None and n >= max_frame:
break
continue
img = convert_to_pil(frame)
prediction = predict(img)
box_frames.append(draw_boxes(img, prediction))
return box_frames
frames = process_video(model, './videoplayback (1).mp4', 30, 1000)
fourcc = cv2.VideoWriter_fourcc(*'XVID')
writer = cv2.VideoWriter('./out.avi', fourcc, 30.0, (640, 360))
for frame in frames:
writer.write(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
writer.release()
len(frames)