hello alice

This commit is contained in:
Arthur Santos 2024-08-27 16:17:31 -03:00
commit 37b3ef728b
18 changed files with 633 additions and 0 deletions

9
.gitignore vendored Normal file
View File

@ -0,0 +1,9 @@
models/
test/
.env/
.git/
**/__pycache__/
*.wav

0
README.md Normal file
View File

11
docs/HELP.md Normal file
View File

@ -0,0 +1,11 @@
projeto_lab: python[3|3.10] src/main [--FLAG]
Flags:
--debugging-video Abre o streaming de video.
--debugging-landmark Abre o streaming de video com reconhecimento de gestos.
--debugging-sound Abre o streaming de video e audio com Image2Sound.
--debugging-lab
--server Abre o debugging para server.
--client Abre o debugging para client.
Exemplo:
python3 src/main.py --video

3
docs/INSTALL.md Normal file
View File

@ -0,0 +1,3 @@
* sudo apt install portaudio19-dev python3 python3-pip python3-dev
* pip install opencv-python mediapipe pyaudio synthesizer
* mkdir models/ && curl https://storage.googleapis.com/mediapipe-models/gesture_recognizer/gesture_recognizer/float16/latest/gesture_recognizer.task --output models/gesture_recognizer.task

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
opencv-python
mediapipe
pyaudio
pygame
synthesizer

BIN
sounds/kick.ogg Normal file

Binary file not shown.

BIN
sounds/snare.ogg Normal file

Binary file not shown.

45
src/entrypoint/cli.py Normal file
View File

@ -0,0 +1,45 @@
import os
import time
from sistema.io import IO
from sistema.settings import ( DOCS_PATH )
from modulos.debug import Debug
from modulos.stream import Stream
class CLI(object):
def __init__(self):
pass
def handler(self, flag):
match flag:
case '--debugging-video':
Debug.debug_video()
case '--debugging-landmark':
Debug.debug_landmark()
case '--debugging-sound':
Debug.debug_sound()
case '--debugging-lab':
Debug.debug_lab()
case '--server':
Stream(is_server=True).server()
case '--client':
Stream(is_server=False).client()
case '--help':
self.help()
case _:
print('Flag não encontrada. Use --help para visualizar todas as opções')
exit(0)
def help(self):
print(IO.read_as_utf8(DOCS_PATH, 'HELP.md'), end='')

12
src/main.py Normal file
View File

@ -0,0 +1,12 @@
import sistema.settings
from entrypoint.cli import CLI
from sys import argv
if __name__ == '__main__':
if len(argv) != 2:
print('Entrada ruim. Utilize python[3|3.10] src/main.py --help')
exit(-1)
CLI().handler(argv[1])

90
src/modulos/debug.py Normal file
View File

@ -0,0 +1,90 @@
from sistema.landmark import Landmark
from sistema.sound import Sound
from sistema.video import Video
from sistema.fsm import State
import numpy as np
import cv2 as cv
import threading
import time
class Debug(object):
def debug_landmark():
v = Video(is_client=True)
l = Landmark()
while True:
frame = v.serialize()
frame = v.deserialize(frame)
frame, _, _, _ = l.recognize(frame)
cv.imshow('Debug', frame)
if cv.waitKey(1) == ord('q'):
break
v.close()
def debug_sound():
v = Video(is_client=True)
s = Sound()
l = Landmark()
_start = time.time()
while True:
frame = v.serialize()
frame = v.deserialize(frame)
frame, x, y, z = l.recognize(frame)
_end = time.time()
if _end - self._start > 0.4:
x = threading.Thread(target=s.play, args=(x, y, z, ))
x.start()
self._start = _end
cv.imshow('Debug', frame)
if cv.waitKey(1) == ord('q'):
break
s.close()
v.close()
def debug_video():
v = Video(is_client=True)
while True:
frame = v.serialize()
frame = v.deserialize(frame)
cv.imshow('Debug', frame)
if cv.waitKey(1) == ord('q'):
break
v.close()
def debug_lab():
v = Video(is_client=True)
l = Landmark()
s = State()
while True:
frame = v.serialize()
frame = v.deserialize(frame)
frame, gesture_name, position_x = l.read_gesture(frame)
s.check((gesture_name, position_x))
cv.imshow('Debug', frame)
if cv.waitKey(1) == ord('q'):
break
v.close()

View File

@ -0,0 +1,30 @@
# TODO
import numpy as np
import cv2 as cv
from sistema.video import Video
class Segmentation(object):
def __init__(self):
pass
def depth_color_map(self, frame):
return frame
def debug(self):
v = Video()
while True:
frame = v.serialize()
frame = self.depth_color_map(frame)
cv.imshow('Debug', frame)
if cv.waitKey(1) == ord('q'):
break
v.close()

71
src/modulos/stream.py Normal file
View File

@ -0,0 +1,71 @@
import struct
import socket
import cv2 as cv
from sistema.video import Video
from sistema.settings import ( HOST, PORT )
class Stream(object):
def __init__(self, is_server=False):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if is_server:
self.socket.bind((HOST, PORT))
self.socket.listen()
else:
self.socket.connect((HOST, PORT))
def server(self):
v = Video(is_client=False)
conn, addr = self.socket.accept()
PAYLOAD_SIZE = struct.calcsize("Q")
streaming_data = b''
while True:
while len(streaming_data) < PAYLOAD_SIZE:
packet = conn.recv(4 * 1024)
if not packet: break
streaming_data += packet
packed_msg_size = streaming_data[:PAYLOAD_SIZE]
streaming_data = streaming_data[PAYLOAD_SIZE:]
video_msg_size = struct.unpack("Q", packed_msg_size)[0]
while len(streaming_data) < video_msg_size:
streaming_data += conn.recv(4 * 1024)
serialized_frame = streaming_data[:video_msg_size]
streaming_data = streaming_data[video_msg_size:]
frame = v.deserialize(serialized_frame)
cv.imshow("Stream", frame)
if cv.waitKey(1) == ord('q'):
break
v.close()
self.close()
def client(self):
v = Video(is_client=True)
while(True):
serialized_frame = v.serialize()
serialized_frame = struct.pack("Q", len(serialized_frame)) \
+ serialized_frame
self.socket.sendall(serialized_frame)
v.close()
self.close()
def close(self):
self.socket.close()

103
src/sistema/fsm.py Normal file
View File

@ -0,0 +1,103 @@
import os
import pygame
from sistema.settings import ( SOUNDS_PATH )
import threading
import time
class State(object):
def __init__(self):
self.STATES = [
'normal',
'insert',
'edit',
'visual',
'exit'
]
self.init_state = 'normal'
self.curr_state = self.init_state
self.end_state = 'exit'
self.gesture_name = None
self.enter_state = True
self.can_play_audio = True
self._start = time.time()
self.position_x = 0
def check(self, package):
self.gesture_name = package[0]
self.position_x = package[1]
print(self.position_x)
match self.curr_state:
case 'normal':
self.normal_state()
case 'insert':
self.insert_state()
case 'exit':
self.exit_state()
def set_state(self, new_state):
if new_state != self.curr_state:
self.enter_state = True
self.curr_state = new_state
def normal_state(self):
if self.enter_state:
self.can_play_audio = True
self.enter_state = False
self.set_state(self.check_normal())
def insert_state(self):
if self.enter_state:
_end = time.time()
if _end - self._start > 0.4:
x = threading.Thread(target=self.play)
x.start()
self.can_play_audio = False
self._start = _end
self.enter_state = False
self.set_state(self.check_insert())
def exit_state(self):
exit(0)
def check_normal(self):
new_state = self.curr_state
if self.gesture_name == 'Closed_Fist':
new_state = 'insert'
if self.curr_state in ['normal', 'insert'] and self.gesture_name == 'Victory':
new_state = 'exit'
return new_state
def check_insert(self):
new_state = self.curr_state
if self.gesture_name in ['Open_Palm', 'Thumb_Up']:
new_state = 'normal'
if self.curr_state in ['normal', 'insert'] and self.gesture_name == 'Victory':
new_state = 'exit'
return new_state
def play(self):
pygame.mixer.init()
pygame.mixer.music.load(os.path.join(SOUNDS_PATH, 'kick.ogg'))
if self.position_x > 0.5:
pygame.mixer.music.load(os.path.join(SOUNDS_PATH, 'snare.ogg'))
else:
pygame.mixer.music.load(os.path.join(SOUNDS_PATH, 'kick.ogg'))
pygame.mixer.music.play()
pygame.time.delay(1000)
pygame.mixer.music.stop()

27
src/sistema/io.py Normal file
View File

@ -0,0 +1,27 @@
import os
import json
class IO(object):
def read_as_json(filepath, filename) -> dict:
with open(os.path.join(filepath, filename), 'r', encoding='utf8') as f:
data = json.load(f)
return data
def read_as_utf8(filepath, filename) -> str:
with open(os.path.join(filepath, filename), 'r', encoding='utf8') as f:
data = f.read()
return data
def update(filepath, filename, data):
with open(os.path.join(filepath, filename), 'w', encoding='utf8') as f:
json.dump(
obj=data,
fp=f,
ensure_ascii=False,
indent='\t',
separators=(',', ': ')
)

115
src/sistema/landmark.py Normal file
View File

@ -0,0 +1,115 @@
import os
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.framework.formats import landmark_pb2
from sistema.settings import ( MODELS_PATH )
class Landmark(object):
def __init__(self):
VisionRunningMode = mp.tasks.vision.RunningMode
self.mp_hands = mp.solutions.hands
self.mp_drawing = mp.solutions.drawing_utils
self.mp_drawing_styles = mp.solutions.drawing_styles
self.base_options = python.BaseOptions(
model_asset_path = os.path.join(MODELS_PATH, 'gesture_recognizer.task')
)
self.options = vision.GestureRecognizerOptions(
base_options = self.base_options,
num_hands = 2
# running_mode = VisionRunningMode.VIDEO
)
self.recognizer = vision.GestureRecognizer.create_from_options(
self.options
)
def recognize(self, frame):
mp_frame = mp.Image(
image_format=mp.ImageFormat.SRGB,
data=frame
)
recognition_result = self.recognizer.recognize(
mp_frame
)
if len(recognition_result.gestures) == 0:
return frame, 0, 0, 0
gesture = recognition_result.gestures[0][0]
gesture_name = gesture.category_name
hand_landmarks = recognition_result.hand_landmarks
test_x = 0
test_y = 0.0
for hand_landmark in hand_landmarks:
hand_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
hand_landmarks_proto.landmark.extend([
landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in hand_landmark])
self.mp_drawing.draw_landmarks(
frame,
hand_landmarks_proto,
self.mp_hands.HAND_CONNECTIONS,
self.mp_drawing_styles.get_default_hand_landmarks_style(),
self.mp_drawing_styles.get_default_hand_connections_style()
)
return frame, hand_landmarks[0][8].x * 100, hand_landmarks[0][8].y * 100, hand_landmarks[0][8].z * 10
def read_gesture(self, frame):
mp_frame = mp.Image(
image_format=mp.ImageFormat.SRGB,
data=frame
)
recognition_result = self.recognizer.recognize(
mp_frame
)
if len(recognition_result.gestures) == 0:
return frame, None, 0
gesture = recognition_result.gestures[0][0]
gesture_name = gesture.category_name
hand_landmarks = recognition_result.hand_landmarks
print(gesture_name)
for hand_landmark in hand_landmarks:
hand_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
hand_landmarks_proto.landmark.extend(
[
landmark_pb2.NormalizedLandmark(
x=landmark.x,
y=landmark.y,
z=landmark.z
)
for landmark in hand_landmark
]
)
self.mp_drawing.draw_landmarks(
frame,
hand_landmarks_proto,
self.mp_hands.HAND_CONNECTIONS,
self.mp_drawing_styles.get_default_hand_landmarks_style(),
self.mp_drawing_styles.get_default_hand_connections_style()
)
return frame, gesture_name, hand_landmarks[0][8].x
def serialize(self):
pass

33
src/sistema/settings.py Normal file
View File

@ -0,0 +1,33 @@
import os
from sys import path
from pathlib import Path
from platform import system
BASE_PATH = Path(__file__).resolve().parent.parent.parent
SRC_PATH = Path(__file__).resolve().parent.parent
path.append(str(SRC_PATH))
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
# GET SSH PUBKEY
SSH_PUBKEY = ''
if os.path.exists(f'{Path.home()}/.ssh/id_rsa.pub'):
with open(f'{Path.home()}/.ssh/id_rsa.pub', 'r') as f:
SSH_PUBKEY = f.read().replace('\n', '')
# /////////////////////////////////////////////////////////////////////////////
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
# ROOT FOLDERS PATH
DOCS_PATH = os.path.join(BASE_PATH, 'docs')
MODELS_PATH = os.path.join(BASE_PATH, 'models')
SOUNDS_PATH = os.path.join(BASE_PATH, 'sounds')
# /////////////////////////////////////////////////////////////////////////////
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
# SERVER SETTINGS
HOST = '127.0.0.1'
PORT = 65432
# /////////////////////////////////////////////////////////////////////////////

37
src/sistema/sound.py Normal file
View File

@ -0,0 +1,37 @@
import pyaudio
from synthesizer import Player, Synthesizer, Waveform
import numpy as np
from types import NoneType
CHORD_FREQUENCIES = [
32.7,
36.7,
41.2,
43.7,
49.0,
55.0,
61.7
]
class Sound(object):
def __init__(self):
self.player = Player()
self.player.open_stream()
self.synthesizer = Synthesizer(osc1_waveform=Waveform.sawtooth, osc1_volume=0.1, use_osc2=False)
def play(self, x, y, z) -> NoneType:
player = Player()
player.open_stream()
synthesizer = Synthesizer(osc1_waveform=Waveform.square, osc1_volume=0.1, use_osc2=False)
chord_mapped = [CHORD_FREQUENCIES[int(x // (100 / 7))] * 2**int(y // (100 / 7))]
player.play_wave(synthesizer.generate_chord(chord_mapped, abs(z)))
player._pyaudio.terminate()
def close(self):
pass

42
src/sistema/video.py Normal file
View File

@ -0,0 +1,42 @@
import pickle
import cv2 as cv
import numpy as np
from types import NoneType
class Video(object):
def __init__(self, is_client):
self.camera = cv.VideoCapture(0)
self.WIDTH = 640
self.HEIGHT = 360
if is_client:
#TODO: Resizible dimensions
#self.WIDTH = int(self.camera.get(cv.CAP_PROP_FRAME_WIDTH))
#self.HEIGHT = int(self.camera.get(cv.CAP_PROP_FRAME_HEIGHT))
#self.WIDTH = 640
#self.HEIGHT = 360
if not self.camera.isOpened():
raise('Não foi possível abrir a câmera.')
def serialize(self) -> bytes:
ret, frame = self.camera.read()
if not ret:
raise('Não foi possível receber o frame.')
return pickle.dumps(frame)
def deserialize(self, serialized_ndarray) -> np.ndarray:
return pickle.loads(serialized_ndarray)
def close(self) -> NoneType:
self.camera.release()
cv.destroyAllWindows()