"""
Copyright (C) 2023 Adobe.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see
This is a test.
", "utf-8")) def do_POST(self): if self.validateRequest(self.headers): _content_len = int(self.headers.get('Content-Length')) _data = self.rfile.read(_content_len) self._set_response(200) if _data is not None: _json_data = json.loads(_data) self._call_listeners("post", _json_data) def do_PATCH(self): if self.validateRequest(self.headers): self._set_response(200) def log_message(self, format, *args): # Override this function to prevent log spam pass class SRE_Server(): def __init__(self): self.server = None self.thread = None self.host = SERVER_HOST self.port = None self.client = SRE_Client() def is_running(self): if self.server: return True else: return False def get_port(self): return self.port def start(self): if self.server is None: self.server = SRE_HTTPServer((self.host, 0), SRE_Handler) self.port = self.server.server_port self.thread = threading.Thread(None, self.server.run) self.thread.daemon = True self.thread.start() return (Code_Response.success, None) return (Code_Response.server_start_error, None) def stop(self): if self.server is not None: self.server.shutdown() self.thread.join() self.server = None self.thread = None self.port = None return (Code_Response.success, None) return (Code_Response.server_stop_error, None) def send_message(self, type, endpoint, op, data={}): if type == Code_RequestType.r_sync: _result = self.client.sync_request( endpoint, op, data=data) return _result elif type == Code_RequestType.r_async: self.client.async_request( endpoint, op, data=data) return (Code_Response.success, None) else: return (Code_Response.server_request_type_error, None)