diff --git a/dtable_events/activities/dtable_update_handler.py b/dtable_events/activities/dtable_update_handler.py index 7eaa31c5..4008c185 100644 --- a/dtable_events/activities/dtable_update_handler.py +++ b/dtable_events/activities/dtable_update_handler.py @@ -8,10 +8,10 @@ class DTableUpdateHander(object): - def __init__(self, app, config): + def __init__(self, app): self.app = app self._enabled = True - self._db_session_class = init_db_session_class(config) + self._db_session_class = init_db_session_class() def start(self): logging.info('Start dtable update scanner') diff --git a/dtable_events/activities/handlers.py b/dtable_events/activities/handlers.py index 35d31a31..5d7511bd 100644 --- a/dtable_events/activities/handlers.py +++ b/dtable_events/activities/handlers.py @@ -26,11 +26,11 @@ class MessageHandler(Thread): 'update_rows_links' ] - def __init__(self, app, config): + def __init__(self, app): Thread.__init__(self) self._finished = Event() - self._db_session_class = init_db_session_class(config) - self._redis_client = RedisClient(config) + self._db_session_class = init_db_session_class() + self._redis_client = RedisClient() self.app = app def run(self): diff --git a/dtable_events/api_calls/api_calls_counter.py b/dtable_events/api_calls/api_calls_counter.py index a062c12b..c66950c9 100644 --- a/dtable_events/api_calls/api_calls_counter.py +++ b/dtable_events/api_calls/api_calls_counter.py @@ -20,10 +20,10 @@ class APICallsCounter: - def __init__(self, config): + def __init__(self): self._finished = Event() - self._db_session_class = init_db_session_class(config) - self._redis_client = RedisClient(config) + self._db_session_class = init_db_session_class() + self._redis_client = RedisClient() self.keep_months = 3 # including this month def count_api_gateway(self, info, db_session): diff --git a/dtable_events/app/app.py b/dtable_events/app/app.py index e3f926df..49b6ad03 100644 --- a/dtable_events/app/app.py +++ b/dtable_events/app/app.py @@ -35,53 +35,52 @@ class App(object): - def __init__(self, config, seafile_config, task_mode): + def __init__(self, task_mode): self._enable_foreground_tasks = task_mode.enable_foreground_tasks self._enable_background_tasks = task_mode.enable_background_tasks self.dtable_update_cache = DTableUpdateCacheManager() - self._stats_sender = StatsSender(config) + self._stats_sender = StatsSender() self._playwright_manager = get_playwright_manager() if self._enable_foreground_tasks: - self._dtable_io_server = DTableIOServer(self, config) + self._dtable_io_server = DTableIOServer(self) if self._enable_background_tasks: # redis client subscriber - self._message_handler = MessageHandler(self, config) - self._notification_rule_handler = NotificationRuleHandler(config) - self._user_activity_counter = UserActivityCounter(config) - self._dtable_real_time_rows_counter = DTableRealTimeRowsCounter(config) - self._workflow_actions_handler = WorkflowActionsHandler(config) - self._webhooker = Webhooker(config) - self._api_calls_counter = APICallsCounter(config) - self._metric_manager = MetricManager(config) - self._universal_app_auto_backup = UniversalAppAutoBackup(config) + self._message_handler = MessageHandler(self) + self._notification_rule_handler = NotificationRuleHandler() + self._user_activity_counter = UserActivityCounter() + self._dtable_real_time_rows_counter = DTableRealTimeRowsCounter() + self._workflow_actions_handler = WorkflowActionsHandler() + self._webhooker = Webhooker() + self._api_calls_counter = APICallsCounter() + self._metric_manager = MetricManager() + self._universal_app_auto_backup = UniversalAppAutoBackup() # cron jobs - self._instant_notices_sender = InstantNoticeSender(config) - self._email_notices_sender = EmailNoticesSender(config) - self._dtables_cleaner = DTablesCleaner(config) - self._dtable_updates_sender = DTableUpdatesSender(config) - self._dtable_notification_rules_scanner = DTableNofiticationRulesScanner(config) - # self._dtable_automation_rules_scanner = DTableAutomationRulesScanner(config) - self._ldap_syncer = LDAPSyncer(config) - self._common_dataset_syncer = CommonDatasetSyncer(self, config) - self._big_data_storage_stats_worker = BigDataStorageStatsWorker(config) - self._clean_db_records_worker = CleanDBRecordsWorker(config) - self._data_sync = DataSyncer(config) - self._workflow_schedule_scanner = WorkflowSchedulesScanner(config) - self._dtable_asset_trash_cleaner = DTableAssetTrashCleaner(config) + self._instant_notices_sender = InstantNoticeSender() + self._email_notices_sender = EmailNoticesSender() + self._dtables_cleaner = DTablesCleaner() + self._dtable_updates_sender = DTableUpdatesSender() + self._dtable_notification_rules_scanner = DTableNofiticationRulesScanner() + self._ldap_syncer = LDAPSyncer() + self._common_dataset_syncer = CommonDatasetSyncer(self) + self._big_data_storage_stats_worker = BigDataStorageStatsWorker() + self._clean_db_records_worker = CleanDBRecordsWorker() + self._data_sync = DataSyncer() + self._workflow_schedule_scanner = WorkflowSchedulesScanner() + self._dtable_asset_trash_cleaner = DTableAssetTrashCleaner() self._license_expiring_notices_sender = LicenseExpiringNoticesSender() - self._dtable_access_log_cleaner = DTableFileAccessLogCleaner(config) - self._dtable_update_handler = DTableUpdateHander(self, config) + self._dtable_access_log_cleaner = DTableFileAccessLogCleaner() + self._dtable_update_handler = DTableUpdateHander(self) # interval - self._virus_scanner = VirusScanner(config, seafile_config) + self._virus_scanner = VirusScanner() # ai stats, listen redis and cron - self.ai_stats_worker = AIStatsWorker(config) + self.ai_stats_worker = AIStatsWorker() # automations pipeline - self._automations_pipeline = AutomationsPipeline(config) + self._automations_pipeline = AutomationsPipeline() def serve_forever(self): @@ -95,7 +94,6 @@ def serve_forever(self): self._metric_manager.start() # always True, ready to collect metrics self._message_handler.start() # always True self._notification_rule_handler.start() # always True - # self._automation_rule_handler.start() # always True self._user_activity_counter.start() # always True self._dtable_real_time_rows_counter.start() # default True self._workflow_actions_handler.start() # always True @@ -111,7 +109,7 @@ def serve_forever(self): self._ldap_syncer.start() # default False self._common_dataset_syncer.start() # default True self._big_data_storage_stats_worker.start() # always True - self._clean_db_records_worker.start() # default false + self._clean_db_records_worker.start() # default True self._data_sync.start() # default True self._workflow_schedule_scanner.start() # default True self._dtable_asset_trash_cleaner.start() # always True @@ -121,7 +119,7 @@ def serve_forever(self): # interval self._virus_scanner.start() # default False # ai stats, listen redis and cron - self.ai_stats_worker.start() # default False + self.ai_stats_worker.start() # default True # automations pipeline self._automations_pipeline.start() # always True diff --git a/dtable_events/app/config.py b/dtable_events/app/config.py index e88b8668..a6dc9299 100644 --- a/dtable_events/app/config.py +++ b/dtable_events/app/config.py @@ -2,67 +2,202 @@ import configparser import logging import os -import sys -logger = logging.getLogger(__name__) +from dtable_events.app.config_parser import ConfigParser + +dtable_web_dir = os.environ.get('DTABLE_WEB_DIR', '/opt/seatable/seatable-server-latest/dtable-web') +logger = logging.getLogger(__name__) -# DTABLE_WEB_DIR -dtable_web_dir = os.environ.get('DTABLE_WEB_DIR', '') -if not dtable_web_dir: - logging.critical('dtable_web_dir is not set') - raise RuntimeError('dtable_web_dir is not set') -if not os.path.exists(dtable_web_dir): - logging.critical('dtable_web_dir %s does not exist' % dtable_web_dir) - raise RuntimeError('dtable_web_dir does not exist.') - -sys.path.insert(0, dtable_web_dir) - -try: - # settings in dtable-web - import seahub.settings as seahub_settings - DTABLE_WEB_SERVICE_URL = getattr(seahub_settings, 'DTABLE_WEB_SERVICE_URL', 'http://127.0.0.1') - DTABLE_PRIVATE_KEY = getattr(seahub_settings, 'DTABLE_PRIVATE_KEY', '') - ENABLE_DTABLE_SERVER_CLUSTER = getattr(seahub_settings, 'ENABLE_DTABLE_SERVER_CLUSTER', False) - FILE_SERVER_ROOT = getattr(seahub_settings, 'FILE_SERVER_ROOT', 'http://127.0.0.1:8082') - INNER_FILE_SERVER_ROOT = getattr(seahub_settings, 'INNER_FILE_SERVER_ROOT', 'http://127.0.0.1:8082') - SEATABLE_FAAS_AUTH_TOKEN = getattr(seahub_settings, 'SEATABLE_FAAS_AUTH_TOKEN', '') - ENABLE_PYTHON_SCRIPT = getattr(seahub_settings, 'ENABLE_PYTHON_SCRIPT', '') - SEATABLE_FAAS_URL = getattr(seahub_settings, 'SEATABLE_FAAS_URL', '') - SECRET_KEY = getattr(seahub_settings, 'SECRET_KEY', '') - SESSION_COOKIE_NAME = getattr(seahub_settings, 'SESSION_COOKIE_NAME', 'sessionid') - EXPORT2EXCEL_DEFAULT_STRING = getattr(seahub_settings, 'EXPORT2EXCEL_DEFAULT_STRING', 'illegal character in excel') - TIME_ZONE = getattr(seahub_settings, 'TIME_ZONE', 'UTC') - INNER_DTABLE_DB_URL = getattr(seahub_settings, 'INNER_DTABLE_DB_URL', 'http://127.0.0.1:7777') - ENABLE_WEIXIN = getattr(seahub_settings, 'ENABLE_WEIXIN', False) - ENABLE_WORK_WEIXIN = getattr(seahub_settings, 'ENABLE_WORK_WEIXIN', False) - ENABLE_DINGTALK = getattr(seahub_settings, 'ENABLE_DINGTALK', False) - USE_INNER_DTABLE_SERVER = getattr(seahub_settings, 'USE_INNER_DTABLE_SERVER', True) - INNER_DTABLE_SERVER_URL = getattr(seahub_settings, 'INNER_DTABLE_SERVER_URL', 'http://127.0.0.1:5000/') - ARCHIVE_VIEW_EXPORT_ROW_LIMIT = getattr(seahub_settings, 'ARCHIVE_VIEW_EXPORT_ROW_LIMIT', 250000) - APP_TABLE_EXPORT_EXCEL_ROW_LIMIT = getattr(seahub_settings, 'APP_TABLE_EXPORT_EXCEL_ROW_LIMIT', 10000) - BIG_DATA_ROW_IMPORT_LIMIT = getattr(seahub_settings, 'BIG_DATA_ROW_IMPORT_LIMIT', 500000) - BIG_DATA_ROW_UPDATE_LIMIT = getattr(seahub_settings, 'BIG_DATA_ROW_UPDATE_LIMIT', 500000) - TRASH_CLEAN_AFTER_DAYS = getattr(seahub_settings, 'TRASH_CLEAN_AFTER_DAYS', 30) - LICENSE_PATH = getattr(seahub_settings, 'LICENSE_PATH', '/shared/seatable-license.txt') - IS_PRO_VERSION = getattr(seahub_settings, 'IS_PRO_VERSION', False) - ENABLE_OPERATION_LOG_DB = getattr(seahub_settings, 'ENABLE_OPERATION_LOG_DB', False) - AI_PRICES = getattr(seahub_settings, 'AI_PRICES', {}) - BAIDU_OCR_TOKENS = getattr(seahub_settings, 'BAIDU_OCR_TOKENS', {}) - UNIVERSAL_APP_SNAPSHOT_AUTO_SAVE_DAYS = getattr(seahub_settings, 'UNIVERSAL_APP_SNAPSHOT_AUTO_SAVE_DAYS', 7) - UNIVERSAL_APP_SNAPSHOT_AUTO_SAVE_NOTES = getattr(seahub_settings, 'UNIVERSAL_APP_SNAPSHOT_AUTO_SAVE_NOTES', 'auto backup') - DTABLE_STORAGE_SERVER_URL = getattr(seahub_settings, 'DTABLE_STORAGE_SERVER_URL', 'http://127.0.0.1:6666') - INNER_SEATABLE_AI_SERVER_URL = getattr(seahub_settings, 'INNER_SEATABLE_AI_SERVER_URL', 'http://127.0.0.1:8888') - ENABLE_SEATABLE_AI = getattr(seahub_settings, 'ENABLE_SEATABLE_AI', False) - AUTO_RULES_AI_CONTENT_MAX_LENGTH = getattr(seahub_settings, 'AUTO_RULES_AI_CONTENT_MAX_LENGTH', 10000) - ORG_MEMBER_QUOTA_DEFAULT = getattr(seahub_settings, 'ORG_MEMBER_QUOTA_DEFAULT', 10) - - # env - CCNET_DB_NAME = os.environ.get('SEATABLE_MYSQL_DB_CCNET_DB_NAME', 'ccnet_db') - TIME_ZONE = os.environ.get('TIME_ZONE', 'UTC') -except Exception as e: - logger.critical("Can not import dtable_web settings: %s." % e) - raise RuntimeError("Can not import dtable_web settings: %s" % e) +central_conf_dir = os.environ.get('SEAFILE_CENTRAL_CONF_DIR', '') +yaml_file_path = os.path.join(central_conf_dir, os.environ.get('SEATABLE_CONFIG_NAME', 'seatable_config.yaml')) +configs = ConfigParser(yaml_file_path, 'dtable-events') + +def validate_llm_models(models): + if not models or not isinstance(models, list): + return [] + validated_models = [] + + for model in models: + if not isinstance(model, dict) or model.get('disable', False): + continue + if model.get('type') in ('proxy', 'other', 'hosted_vllm'): + required_fields = ('model', 'url') + else: + required_fields = ('model', 'key') + if not all(field in model for field in required_fields): + continue + model['label'] = model.get('label', model['model']) + validated_models.append(model) + + return validated_models + + +def get_llm_prices(models): + if not models or not isinstance(models, list): + return {} + prices = {} + for model in models: + if not isinstance(model, dict): + continue + model_name = model.get('model') + price = model.get('price') + if not model_name or not isinstance(price, dict): + continue + if 'input_tokens' not in price: + continue + prices[model_name] = { + 'input_tokens': price.get('input_tokens', 0), + 'output_tokens': price.get('output_tokens', 0), + } + return prices + +TIME_ZONE = configs.get('TIME_ZONE', default='UTC') +DTABLE_WEB_SERVICE_URL = configs.get('DTABLE_WEB_SERVICE_URL', default='http://127.0.0.1') + +DTABLE_PRIVATE_KEY = configs.get('JWT_PRIVATE_KEY', default='') +SECRET_KEY = configs.get('SECRET_KEY', default='') + +# mysql +SEATABLE_MYSQL_DB_HOST = configs.get('SEATABLE_MYSQL_DB_HOST', default='') +SEATABLE_MYSQL_DB_PORT = configs.get('SEATABLE_MYSQL_DB_PORT', default=3306) +SEATABLE_MYSQL_DB_USER = configs.get('SEATABLE_MYSQL_DB_USER', default='root') +SEATABLE_MYSQL_DB_PASSWORD = configs.get('SEATABLE_MYSQL_DB_PASSWORD', default='') +SEATABLE_MYSQL_DB_DTABLE_DB_NAME = configs.get('SEATABLE_MYSQL_DB_DTABLE_DB_NAME', default='dtable_db') +SEATABLE_MYSQL_DB_CCNET_DB_NAME = configs.get('SEATABLE_MYSQL_DB_CCNET_DB_NAME', default='ccnet_db') +SEATABLE_MYSQL_DB_SEAFILE_DB_NAME = configs.get('SEATABLE_MYSQL_DB_SEAFILE_DB_NAME', default='seafile_db') +ENABLE_OPERATION_LOG_DB = configs.get('ENABLE_OPERATION_LOG_DB', default=False) + +# redis +REDIS_HOST = configs.get('REDIS_HOST', default='127.0.0.1') +REDIS_PORT = configs.get('REDIS_PORT', default=6379) +REDIS_PASSWORD = configs.get('REDIS_PASSWORD', default='') + +# inner server url +INNER_DTABLE_SERVER_URL = configs.get('INNER_DTABLE_SERVER_URL', default='http://127.0.0.1:5000') +INNER_DTABLE_DB_URL = configs.get('INNER_DTABLE_DB_URL', default='http://127.0.0.1:7777') + +# file server +FILE_SERVER_ROOT = DTABLE_WEB_SERVICE_URL.strip('/') + '/seafhttp' +INNER_FILE_SERVER_ROOT = 'http://127.0.0.1:8082' + +# python runner +SEATABLE_FAAS_URL = configs.get('SEATABLE_FAAS_URL', default='') +ENABLE_PYTHON_SCRIPT = configs.get('ENABLE_PYTHON_SCRIPT', default=False) +SEATABLE_FAAS_AUTH_TOKEN = configs.get('SEATABLE_FAAS_AUTH_TOKEN', default='') + +# AI +ENABLE_SEATABLE_AI = configs.get('ENABLE_SEATABLE_AI', default=False) +INNER_SEATABLE_AI_SERVER_URL = configs.get('INNER_SEATABLE_AI_SERVER_URL', default='http://127.0.0.1:8888') +AUTO_RULES_AI_CONTENT_MAX_LENGTH = configs.get('AUTO_RULES_AI_CONTENT_MAX_LENGTH', default=10000) + +# AI models and prices +LLM_MODELS = [] +LLM_MODELS = validate_llm_models(configs.get('LLM_MODELS', LLM_MODELS)) +AI_PRICES = get_llm_prices(LLM_MODELS) +BAIDU_OCR_TOKENS = configs.get('BAIDU_OCR_TOKENS', default={}) + +# IO server +ARCHIVE_VIEW_EXPORT_ROW_LIMIT = configs.get('ARCHIVE_VIEW_EXPORT_ROW_LIMIT', default=250000) +APP_TABLE_EXPORT_EXCEL_ROW_LIMIT = configs.get('APP_TABLE_EXPORT_EXCEL_ROW_LIMIT', default=10000) +BIG_DATA_ROW_IMPORT_LIMIT = configs.get('BIG_DATA_ROW_IMPORT_LIMIT', default=500000) +BIG_DATA_ROW_UPDATE_LIMIT = configs.get('BIG_DATA_ROW_UPDATE_LIMIT', default=500000) + +# notices feature +ENABLE_WEIXIN = configs.get('ENABLE_WEIXIN', default=False) +ENABLE_WORK_WEIXIN = configs.get('ENABLE_WORK_WEIXIN', default=False) +ENABLE_DINGTALK = configs.get('ENABLE_DINGTALK', default=False) + +# trash clean +TRASH_CLEAN_AFTER_DAYS = configs.get('TRASH_CLEAN_AFTER_DAYS', default=30) + +# license +LICENSE_PATH = configs.get('LICENSE_PATH', default='/opt/seatable/seatable-license.txt') + +IS_PRO_VERSION = os.environ.get('IS_PRO_VERSION', default=True) + +# universal app snapshots +UNIVERSAL_APP_SNAPSHOT_AUTO_SAVE_DAYS = configs.get('UNIVERSAL_APP_SNAPSHOT_AUTO_SAVE_DAYS', default=7) + +# storage server +DTABLE_STORAGE_SERVER_URL = configs.get('DTABLE_STORAGE_SERVER_URL', default='http://127.0.0.1:6666') + +# org member quota +ORG_MEMBER_QUOTA_DEFAULT = configs.get('ORG_MEMBER_QUOTA_DEFAULT', 10) + +# log +LOG_LEVEL = configs.get('LOG_LEVEL', default='info') +LOG_DIR = configs.get('LOG_DIR', '/opt/seatable/logs') +SEATABLE_LOG_TO_STDOUT = configs.get('SEATABLE_LOG_TO_STDOUT', default=False) + +# IO server +IO_SERVER_HOST = configs.get('IO_SERVER_HOST', default='127.0.0.1') +IO_SERVER_PORT = configs.get('IO_SERVER_PORT', default=6000) +IO_SERVER_WORKERS = configs.get('IO_SERVER_WORKERS', default=3) +IO_SERVER_TASK_TIMEOUT = configs.get('IO_SERVER_TASK_TIMEOUT', default=3600) + +# instant notices sender +INSTANT_SENDER_INTERVAL = configs.get('INSTANT_SENDER_INTERVAL', default=60) + +# email notices sender +EMAIL_SENDER_ENABLED = configs.get('EMAIL_SENDER_ENABLED', default=True) +EMAIL_SENDER_INTERVAL = configs.get('EMAIL_SENDER_INTERVAL', default=60*60) + +# updates sender +UPDATES_SENDER_ENABLED = configs.get('UPDATES_SENDER_ENABLED', default=True) + +# notification rules scanner +NOTIFICATION_RULES_SCAN_ENABLED = configs.get('NOTIFICATION_RULES_SCAN_ENABLED', default=True) + +# LDAP sync +LDAP_SYNC_ENABLED = configs.get('LDAP_SYNC_ENABLED', default=False) +LDAP_SYNC_INTERVAL = configs.get('LDAP_SYNC_INTERVAL', default=60*60) + +# common dataset syncer +COMMON_DATASET_SYNCER_ENABLED = configs.get('COMMON_DATASET_SYNCER_ENABLED', default=True) + +# clean db +CLEAN_DB_ENABLED = configs.get('CLEAN_DB_ENABLED', default=True) +CLEAN_DB_KEEP_DTABLE_SNAPSHOT_DAYS = configs.get('CLEAN_DB_KEEP_DTABLE_SNAPSHOT_DAYS', default=365) +CLEAN_DB_KEEP_ACTIVITIES_DAYS = configs.get('CLEAN_DB_KEEP_ACTIVITIES_DAYS', default=30) +CLEAN_DB_KEEP_OPERATION_LOG_DAYS = configs.get('CLEAN_DB_KEEP_OPERATION_LOG_DAYS', default=14) +CLEAN_DB_KEEP_DELETE_OPERATION_LOG_DAYS = configs.get('CLEAN_DB_KEEP_DELETE_OPERATION_LOG_DAYS', default=30) +CLEAN_DB_KEEP_DTABLE_DB_OP_LOG_DAYS = configs.get('CLEAN_DB_KEEP_DTABLE_DB_OP_LOG_DAYS', default=30) +CLEAN_DB_KEEP_NOTIFICATIONS_USERNOTIFICATION_DAYS = configs.get('CLEAN_DB_KEEP_NOTIFICATIONS_USERNOTIFICATION_DAYS', default=30) +CLEAN_DB_KEEP_DTABLE_NOTIFICATIONS_DAYS = configs.get('CLEAN_DB_KEEP_DTABLE_NOTIFICATIONS_DAYS', default=30) +CLEAN_DB_KEEP_SESSION_LOG_DAYS = configs.get('CLEAN_DB_KEEP_SESSION_LOG_DAYS', default=30) +CLEAN_DB_KEEP_AUTO_RULES_TASK_LOG_DAYS = configs.get('CLEAN_DB_KEEP_AUTO_RULES_TASK_LOG_DAYS', default=30) +CLEAN_DB_KEEP_USER_ACTIVITY_STATISTICS_DAYS = configs.get('CLEAN_DB_KEEP_USER_ACTIVITY_STATISTICS_DAYS', default=0) +CLEAN_DB_KEEP_DTABLE_APP_PAGES_OPERATION_LOG_DAYS = configs.get('CLEAN_DB_KEEP_DTABLE_APP_PAGES_OPERATION_LOG_DAYS', default=14) + +# email syncer +EMAIL_SYNCER_ENABLED = configs.get('EMAIL_SYNCER_ENABLED', default=True) +EMAIL_SYNCER_MAX_WORKERS = configs.get('EMAIL_SYNCER_MAX_WORKERS', default=5) + +# workflow scanner +WORKFLOW_SCANNER_ENABLED = configs.get('WORKFLOW_SCANNER_ENABLED', default=True) + +# AI stats +AI_STATS_ENABLED = configs.get('AI_STATS_ENABLED', default=True) + +# automation +AUTOMATION_WORKERS = configs.get('AUTOMATION_WORKERS', default=5) +AUTOMATION_RATE_LIMIT_WINDOW_SECS = configs.get('AUTOMATION_RATE_LIMIT_WINDOW_SECS', default=300) +AUTOMATION_RATE_LIMIT_PERCENT = configs.get('AUTOMATION_RATE_LIMIT_PERCENT', default=0.25) + +# playwright +CONVERT_PDF_BROWSERS = configs.get('CONVERT_PDF_BROWSERS', default=2) +CONVERT_PDF_SESSIONS_PER_BROWSER = configs.get('CONVERT_PDF_SESSIONS_PER_BROWSER', default=3) + +# virus code +VIRUS_SCAN_ENABLED = configs.get('VIRUS_SCAN_ENABLED', default=False) +VIRUS_SCAN_SCAN_COMMAND = configs.get('VIRUS_SCAN_SCAN_COMMAND') +VIRUS_SCAN_VIRUS_CODE = configs.get('VIRUS_SCAN_VIRUS_CODE') +VIRUS_SCAN_NONVIRUS_CODE = configs.get('VIRUS_SCAN_NONVIRUS_CODE') +VIRUS_SCAN_SCAN_INTERVAL = configs.get('VIRUS_SCAN_SCAN_INTERVAL', default=60) +VIRUS_SCAN_SCAN_SIZE_LIMIT = configs.get('VIRUS_SCAN_SCAN_SIZE_LIMIT', default=20) +VIRUS_SCAN_SCAN_SKIP_EXT = configs.get('VIRUS_SCAN_SCAN_SKIP_EXT') +VIRUS_SCAN_THREADS = configs.get('VIRUS_SCAN_THREADS', default=4) def get_config(config_file): config = configparser.ConfigParser() diff --git a/dtable_events/app/config_parser.py b/dtable_events/app/config_parser.py new file mode 100644 index 00000000..35e6d76a --- /dev/null +++ b/dtable_events/app/config_parser.py @@ -0,0 +1,70 @@ +import os +import yaml +import json +import logging + +logger = logging.getLogger(__name__) +def _read_yaml(yaml_file_path=None, component_name=None): + if yaml_file_path: + if not os.path.isfile(yaml_file_path) or (not yaml_file_path.endswith('yml') and not yaml_file_path.endswith('yaml')): + logger.warning(f'{yaml_file_path} is not existed or not a valid YAML file') + return {} + + # first read, get vars from global + configs = {} + with open(yaml_file_path, 'r', encoding='utf-8') as yaml_file: + current_yaml_config = yaml.safe_load(yaml_file) or {} + configs = current_yaml_config.get('global', {}) + if component_name: + component_config = current_yaml_config.get(component_name, {}) + configs.update(component_config) + if 'from_yaml' in component_config: + del configs['from_yaml'] + configs.update(_read_yaml(component_config['from_yaml'])) + return configs + return {} + +def _check_type(func): + def wrapper(self, key, default=None, check_type=True): + result = func(self, key, default) + if check_type: + need_type = type(default) + if need_type in (int, float): + try: + result = need_type(result) + except: + raise ValueError(f'Type of {key} must be a number') + elif need_type == bool: + if isinstance(result, str): + result = result.lower() in ('true', '1') + else: + result = bool(result) + elif need_type in (str, list, dict): + result = need_type(result) + return result + return wrapper + +class ConfigParser(object): + def __init__(self, yaml_file_path, component_name): + assert yaml_file_path and component_name, "yaml_file_path and component_name must be specified in initilizing ConfigParser" + self.refresh_yaml_configs(yaml_file_path, component_name) + + def refresh_yaml_configs(self, yaml_file_path=None, component_name=None): + self.yaml_file_path = yaml_file_path or self.yaml_file_path + self.component_name = component_name or self.component_name + try: + self.yaml_configs = _read_yaml(self.yaml_file_path, self.component_name) + except Exception as e: + logger.error(f'Failure to read YAML config file: {e}') + raise + + @_check_type + def get(self, key, default=None): + if key in os.environ: + value = os.getenv(key) + try: + value = json.loads(value) + except: + pass + return value + return self.yaml_configs.get(key, default) diff --git a/dtable_events/app/event_redis.py b/dtable_events/app/event_redis.py index f747a535..5dbe73d7 100644 --- a/dtable_events/app/event_redis.py +++ b/dtable_events/app/event_redis.py @@ -6,17 +6,19 @@ import time import redis +from dtable_events.app.config import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD + logger = logging.getLogger(__name__) REDIS_METRIC_KEY = 'metric' class RedisClient(object): - def __init__(self, config, socket_connect_timeout=30, socket_timeout=None): + def __init__(self, socket_connect_timeout=30, socket_timeout=None): self._host = '127.0.0.1' self._port = 6379 self._password = None - self._parse_config(config) + self._parse_config() """ By default, each Redis instance created will in turn create its own connection pool. @@ -28,29 +30,11 @@ def __init__(self, config, socket_connect_timeout=30, socket_timeout=None): decode_responses=True ) - def _parse_config(self, config): - - if not (redis_host := os.getenv('REDIS_HOST')): - if config.has_option('REDIS', 'host'): - self._host = config.get('REDIS', 'host') - else: - self._host = redis_host + def _parse_config(self): - if not (redis_port := os.getenv('REDIS_PORT')): - if config.has_option('REDIS', 'port'): - self._port = config.getint('REDIS', 'port') - else: - self._port = redis_port - try: - self._port = int(self._port) - except: - raise ValueError(f'Invalid redis port: {self._port}') - - if not (redis_password := os.getenv('REDIS_PASSWORD')): - if config.has_option('REDIS', 'password'): - self._password = config.get('REDIS', 'password') - else: - self._password = redis_password + self._host = REDIS_HOST + self._port = REDIS_PORT + self._password = REDIS_PASSWORD def get_subscriber(self, channel_name): while True: @@ -86,8 +70,8 @@ class RedisCache(object): def __init__(self): self._redis_client = None - def init_redis(self, config): - self._redis_client = RedisClient(config) + def init_redis(self): + self._redis_client = RedisClient() def get(self, key): return self._redis_client.get(key) diff --git a/dtable_events/app/log.py b/dtable_events/app/log.py index 3085cf01..5f7bd643 100644 --- a/dtable_events/app/log.py +++ b/dtable_events/app/log.py @@ -4,6 +4,8 @@ import logging from logging import handlers +from dtable_events.app.config import LOG_LEVEL, LOG_DIR, SEATABLE_LOG_TO_STDOUT + def _get_log_level(level): if level == 'debug': @@ -28,19 +30,16 @@ def get_format(component=None, fmt=None): class LogConfigurator(object): - def __init__(self, level, logfile=None): - self._level = _get_log_level(level) - self._logfile = logfile + def __init__(self): + self._level = _get_log_level(LOG_LEVEL) + self._logfile = os.path.join(LOG_DIR, 'dtable-events.log') - if logfile is None: - self._basic_config() - else: - self._rotating_config() + self._rotating_config() def _rotating_config(self): logging.root.setLevel(self._level) - if os.environ.get('SEATABLE_LOG_TO_STDOUT', 'false') == 'true': + if SEATABLE_LOG_TO_STDOUT: # logs to stdout stdout_formatter = logging.Formatter(get_format(component='dtable-events'), datefmt="%Y-%m-%d %H:%M:%S") stdout_handler = logging.StreamHandler() @@ -55,22 +54,6 @@ def _rotating_config(self): logging.root.addHandler(file_handler) - def _basic_config(self): - # Log to stdout. Mainly for development. - if os.environ.get('SEATABLE_LOG_TO_STDOUT', 'false') == 'true': - component = 'dtable-events' - else: - component = None - kw = { - 'format': get_format(component=component), - 'datefmt': '%m/%d/%Y %H:%M:%S', - 'level': self._level, - 'stream': sys.stdout - } - - logging.basicConfig(**kw) - - def setup_logger(logname, fmt=None, level=None, propagate=None): """ setup logger for dtable io @@ -79,7 +62,7 @@ def setup_logger(logname, fmt=None, level=None, propagate=None): if propagate is not None: logger.propagate = propagate - if os.environ.get('SEATABLE_LOG_TO_STDOUT', 'false') == 'true': + if SEATABLE_LOG_TO_STDOUT: # logs to stdout logger_component_name = logname stdout_handler = logging.StreamHandler() diff --git a/dtable_events/app/stats_sender.py b/dtable_events/app/stats_sender.py index 6bec586a..6c98582c 100644 --- a/dtable_events/app/stats_sender.py +++ b/dtable_events/app/stats_sender.py @@ -10,9 +10,9 @@ class StatsSender: CDS_CHANNEL = 'stats_cds' - def __init__(self, config): + def __init__(self): self.config = None - self._redis_client = RedisClient(config) + self._redis_client = RedisClient() def send(self, channel: str, info: dict): try: diff --git a/dtable_events/automations/automations_pipeline.py b/dtable_events/automations/automations_pipeline.py index 6dc753e5..a7159a2c 100644 --- a/dtable_events/automations/automations_pipeline.py +++ b/dtable_events/automations/automations_pipeline.py @@ -9,7 +9,8 @@ from apscheduler.schedulers.blocking import BlockingScheduler from sqlalchemy import text -from dtable_events.app.config import DTABLE_WEB_SERVICE_URL +from dtable_events.app.config import DTABLE_WEB_SERVICE_URL, AUTOMATION_RATE_LIMIT_PERCENT, \ + AUTOMATION_RATE_LIMIT_WINDOW_SECS, AUTOMATION_WORKERS from dtable_events.app.event_redis import RedisClient from dtable_events.app.log import auto_rule_logger from dtable_events.automations.actions import AutomationRule, AutomationResult @@ -66,14 +67,14 @@ def get_percent(self, owner, org_id, workers): class AutomationsPipeline: - def __init__(self, config): + def __init__(self): self.workers = 5 self.automations_queue: Queue[AutomationRule] = Queue() self.results_queue: Queue[AutomationResult] = Queue() - self._db_session_class = init_db_session_class(config) + self._db_session_class = init_db_session_class() - self._redis_client = RedisClient(config, socket_timeout=10) + self._redis_client = RedisClient(socket_timeout=10) self.per_update_channel = 'automation-rule-triggered' self.rate_limiter = RateLimiter() @@ -110,22 +111,11 @@ def add_exceed_system_resource_limit_entity(self, owner, org_id): self.exceed_system_resource_limit_entities['owners_map'][owner] = 1 def parse_config(self): - try: - self.workers = int(os.environ.get('AUTOMATION_WORKERS', self.workers)) - except: - pass + self.workers = AUTOMATION_WORKERS - try: - rate_limit_window_secs = int(os.environ.get('AUTOMATION_RATE_LIMIT_WINDOW_SECS', '300')) - self.rate_limiter.window_secs = rate_limit_window_secs - except: - pass + self.rate_limiter.window_secs = AUTOMATION_RATE_LIMIT_WINDOW_SECS - try: - rate_limit_percent = float(os.environ.get('AUTOMATION_RATE_LIMIT_PERCENT', '0.25')) - self.rate_limiter.percent = rate_limit_percent - except: - pass + self.rate_limiter.percent = AUTOMATION_RATE_LIMIT_PERCENT def publish_metrics(self): while True: diff --git a/dtable_events/automations/automations_stats_manager.py b/dtable_events/automations/automations_stats_manager.py index 956d716b..97712852 100644 --- a/dtable_events/automations/automations_stats_manager.py +++ b/dtable_events/automations/automations_stats_manager.py @@ -8,7 +8,7 @@ from dtable_events.notification_rules.notification_rules_utils import send_notification from dtable_events.utils import get_dtable_admins -from dtable_events.app.config import CCNET_DB_NAME, DTABLE_WEB_SERVICE_URL, ORG_MEMBER_QUOTA_DEFAULT +from dtable_events.app.config import SEATABLE_MYSQL_DB_CCNET_DB_NAME, DTABLE_WEB_SERVICE_URL, ORG_MEMBER_QUOTA_DEFAULT from dtable_events.automations.actions import AutomationResult from dtable_events.utils.dtable_web_api import DTableWebAPI @@ -19,7 +19,7 @@ def __init__(self): self.dtable_web_api = DTableWebAPI(DTABLE_WEB_SERVICE_URL) self.roles = None - self.ccnet_db_name = CCNET_DB_NAME + self.ccnet_db_name = SEATABLE_MYSQL_DB_CCNET_DB_NAME def get_roles(self): if self.roles: diff --git a/dtable_events/automations/utils.py b/dtable_events/automations/utils.py index fd136234..3e6f68cd 100644 --- a/dtable_events/automations/utils.py +++ b/dtable_events/automations/utils.py @@ -2,8 +2,8 @@ from dtable_events.app.log import auto_rule_logger from dtable_events.automations.actions import AutomationRule, auto_rule_logger -def run_auto_rule_task(trigger, actions, options, config): - db_session = init_db_session_class(config)() +def run_auto_rule_task(trigger, actions, options): + db_session = init_db_session_class()() try: auto_rule_logger.info('start to run test auto rule: %s', options['rule_id']) auto_rule = AutomationRule(None, trigger, actions, options) diff --git a/dtable_events/ccnet/organization.py b/dtable_events/ccnet/organization.py index a092b29e..e4bc6735 100644 --- a/dtable_events/ccnet/organization.py +++ b/dtable_events/ccnet/organization.py @@ -1,10 +1,10 @@ from sqlalchemy import text -from dtable_events.app.config import CCNET_DB_NAME +from dtable_events.app.config import SEATABLE_MYSQL_DB_CCNET_DB_NAME def get_org_admins(db_session, org_id): - sql = f"SELECT `email` FROM `{CCNET_DB_NAME}`.`OrgUser` WHERE `org_id`=:org_id AND `is_staff`=1" + sql = f"SELECT `email` FROM `{SEATABLE_MYSQL_DB_CCNET_DB_NAME}`.`OrgUser` WHERE `org_id`=:org_id AND `is_staff`=1" rows = db_session.execute(text(sql), {'org_id': org_id}) admins = [row.email for row in rows] return admins diff --git a/dtable_events/ccnet/user.py b/dtable_events/ccnet/user.py index d084eca9..82218f7a 100644 --- a/dtable_events/ccnet/user.py +++ b/dtable_events/ccnet/user.py @@ -1,10 +1,10 @@ from sqlalchemy import text -from dtable_events.app.config import CCNET_DB_NAME +from dtable_events.app.config import SEATABLE_MYSQL_DB_CCNET_DB_NAME def get_user_role(db_session, username): - sql = f"SELECT `role` FROM `{CCNET_DB_NAME}`.`UserRole` WHERE `email`=:email" + sql = f"SELECT `role` FROM `{SEATABLE_MYSQL_DB_CCNET_DB_NAME}`.`UserRole` WHERE `email`=:email" row = db_session.execute(text(sql), {'email': username}).fetchone() if not row: return 'default' diff --git a/dtable_events/common_dataset/common_dataset_syncer.py b/dtable_events/common_dataset/common_dataset_syncer.py index 35f2dcc2..e025371c 100644 --- a/dtable_events/common_dataset/common_dataset_syncer.py +++ b/dtable_events/common_dataset/common_dataset_syncer.py @@ -10,30 +10,19 @@ from sqlalchemy import text from dtable_events import init_db_session_class -from dtable_events.app.config import DTABLE_PRIVATE_KEY +from dtable_events.app.config import COMMON_DATASET_SYNCER_ENABLED from dtable_events.common_dataset.common_dataset_sync_utils import batch_sync_common_dataset, cds_logger -from dtable_events.utils import get_opt_from_conf_or_env, parse_bool class CommonDatasetSyncer(object): - def __init__(self, app, config): + def __init__(self, app): self.app = app self._enabled = True - self._prepara_config(config) - self._db_session_class = init_db_session_class(config) + self._prepara_config() + self._db_session_class = init_db_session_class() - def _prepara_config(self, config): - section_name = 'COMMON DATASET SYNCER' - key_enabled = 'enabled' - - if not config.has_section(section_name): - section_name = 'COMMON-DATASET-SYNCER' - if not config.has_section(section_name): - return - - # enabled - enabled = get_opt_from_conf_or_env(config, section_name, key_enabled, default=True) - self._enabled = parse_bool(enabled) + def _prepara_config(self): + self._enabled = COMMON_DATASET_SYNCER_ENABLED def start(self): if not self.is_enabled(): diff --git a/dtable_events/convert_page/manager.py b/dtable_events/convert_page/manager.py index 149e7a81..340bdb08 100644 --- a/dtable_events/convert_page/manager.py +++ b/dtable_events/convert_page/manager.py @@ -20,6 +20,7 @@ import psutil from playwright.async_api import async_playwright, Playwright, Browser, BrowserContext +from dtable_events.app.config import CONVERT_PDF_BROWSERS, CONVERT_PDF_SESSIONS_PER_BROWSER from dtable_events.convert_page.utils import wait_for_images logger = logging.getLogger(__name__) @@ -541,16 +542,8 @@ async def handle_console(msg): def get_playwright_manager(): global playwright_manager if not playwright_manager: - try: - num_browsers = int(os.environ.get('CONVERT_PDF_BROWSERS', '2')) - except: - num_browsers = 2 - try: - contexts_per_browser = int(os.environ.get('CONVERT_PDF_SESSIONS_PER_BROWSER', '3')) - except: - contexts_per_browser = 3 playwright_manager = RobustPlaywrightManager( - num_browsers=num_browsers, - contexts_per_browser=contexts_per_browser + num_browsers=CONVERT_PDF_BROWSERS, + contexts_per_browser=CONVERT_PDF_SESSIONS_PER_BROWSER ) return playwright_manager diff --git a/dtable_events/data_sync/data_syncer.py b/dtable_events/data_sync/data_syncer.py index cadb0746..61f0ed77 100644 --- a/dtable_events/data_sync/data_syncer.py +++ b/dtable_events/data_sync/data_syncer.py @@ -8,39 +8,22 @@ from sqlalchemy import text from dtable_events import init_db_session_class +from dtable_events.app.config import EMAIL_SYNCER_ENABLED, EMAIL_SYNCER_MAX_WORKERS from dtable_events.data_sync.data_sync_utils import run_sync_emails -from dtable_events.utils import get_opt_from_conf_or_env, parse_bool, uuid_str_to_36_chars +from dtable_events.utils import uuid_str_to_36_chars class DataSyncer(object): - def __init__(self, config): + def __init__(self): self._enabled = True self._max_workers = 5 - self._prepara_config(config) - self._db_session_class = init_db_session_class(config) - - def _prepara_config(self, config): - section_name = 'EMAIL SYNCER' - key_enabled = 'enabled' - key_max_workers = 'max_workers' - - if not config.has_section(section_name): - section_name = 'EMAIL-SYNCER' - if not config.has_section(section_name): - return - - # enabled - enabled = get_opt_from_conf_or_env(config, section_name, key_enabled, default=True) - self._enabled = parse_bool(enabled) - # max workers - max_workers = get_opt_from_conf_or_env(config, section_name, key_max_workers, default=5) - try: - self._max_workers = int(max_workers) - except: - pass - finally: - self._max_workers = min(32, self._max_workers) + self._prepara_config() + self._db_session_class = init_db_session_class() + + def _prepara_config(self): + self._enabled = EMAIL_SYNCER_ENABLED + self._max_workers = EMAIL_SYNCER_MAX_WORKERS def start(self): if not self.is_enabled(): diff --git a/dtable_events/db.py b/dtable_events/db.py index c0b368fb..2039dcce 100644 --- a/dtable_events/db.py +++ b/dtable_events/db.py @@ -9,6 +9,10 @@ from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import sessionmaker +from dtable_events.app.config import SEATABLE_MYSQL_DB_HOST, SEATABLE_MYSQL_DB_PORT, SEATABLE_MYSQL_DB_USER, \ + SEATABLE_MYSQL_DB_PASSWORD, SEATABLE_MYSQL_DB_DTABLE_DB_NAME, SEATABLE_MYSQL_DB_CCNET_DB_NAME, \ + SEATABLE_MYSQL_DB_SEAFILE_DB_NAME + logger = logging.getLogger(__name__) @@ -19,130 +23,26 @@ class Base(DeclarativeBase): SeafBase = automap_base() -def create_engine_from_conf(config): - backend = config.get('DATABASE', 'type') if config.has_section('DATABASE') and config.has_option('DATABASE', 'type') else 'mysql' - - if backend == 'mysql': - if not (host := os.getenv('SEATABLE_MYSQL_DB_HOST')): - if config.has_option('DATABASE', 'host'): - host = config.get('DATABASE', 'host').lower() - else: - host = 'localhost' - - if not (port := os.getenv('SEATABLE_MYSQL_DB_PORT')): - if config.has_option('DATABASE', 'port'): - port = config.getint('DATABASE', 'port') - else: - port = 3306 - - try: - port = int(port) - except: - raise ValueError(f'Invalid database port: {port}') - - if not (username := os.getenv('SEATABLE_MYSQL_DB_USER')): - username = config.get('DATABASE', 'username') - - if not (password := os.getenv('SEATABLE_MYSQL_DB_PASSWORD')): - password = config.get('DATABASE', 'password') - - if not (db_name := os.getenv('SEATABLE_MYSQL_DB_DTABLE_DB_NAME')): - db_name = config.get('DATABASE', 'db_name') - - db_url = "mysql+mysqldb://%s:%s@%s:%s/%s?charset=utf8" % \ - (username, quote_plus(password), host, port, db_name) - logger.debug('[dtable_events] database: mysql, name: %s', db_name) - else: - logger.error("Unknown database backend: %s" % backend) - raise RuntimeError("Unknown database backend: %s" % backend) - - # Add pool recycle, or mysql connection will be closed - # by mysql daemon if idle for too long. - """MySQL has gone away - https://docs.sqlalchemy.org/en/20/faq/connections.html#mysql-server-has-gone-away - https://docs.sqlalchemy.org/en/20/core/pooling.html#pool-disconnects - """ - kwargs = dict(pool_recycle=300, pool_pre_ping=True, echo=False, echo_pool=False) - - engine = create_engine(db_url, **kwargs) - - return engine - - -def create_seafile_engine_from_conf(config): - backend = config.get('DATABASE', 'type') if config.has_section('DATABASE') and config.has_option('DATABASE', 'type') else 'mysql' - - if backend == 'mysql': - if not (host := os.getenv('SEATABLE_MYSQL_DB_HOST')): - if config.has_option('database', 'host'): - host = config.get('database', 'host').lower() - else: - host = 'localhost' - - if not (port := os.getenv('SEATABLE_MYSQL_DB_PORT')): - if config.has_option('database', 'port'): - port = config.getint('database', 'port') - else: - port = 3306 - - try: - port = int(port) - except: - raise ValueError(f'Invalid database port: {port}') - - if not (username := os.getenv('SEATABLE_MYSQL_DB_USER')): - username = config.get('database', 'user') - - if not (password := os.getenv('SEATABLE_MYSQL_DB_PASSWORD')): - password = config.get('database', 'password') - - if not (db_name := os.getenv('SEATABLE_MYSQL_DB_SEAFILE_DB_NAME')): - db_name = config.get('database', 'db_name') - - db_url = "mysql+mysqldb://%s:%s@%s:%s/%s?charset=utf8" % \ - (username, quote_plus(password), host, port, db_name) - logger.debug('[dtable_events] seafile database: mysql, name: %s', db_name) - else: - logger.error("Unknown seafile database backend: %s" % backend) - raise RuntimeError("Unknown seafile database backend: %s" % backend) - - # Add pool recycle, or mysql connection will be closed - # by mysql daemon if idle for too long. - """MySQL has gone away - https://docs.sqlalchemy.org/en/20/faq/connections.html#mysql-server-has-gone-away - https://docs.sqlalchemy.org/en/20/core/pooling.html#pool-disconnects - """ - kwargs = dict(pool_recycle=300, pool_pre_ping=True, echo=False, echo_pool=False) - - engine = create_engine(db_url, **kwargs) - - return engine - - -def create_operation_log_db_engine_from_conf(config): - backend = config.get('DATABASE', 'type') - - if backend == 'mysql': - if config.has_option('DATABASE', 'operation_log_db_host'): - host = config.get('DATABASE', 'operation_log_db_host').lower() - else: - host = 'localhost' +def create_engine_from_conf(db): + db_name = '' + if db == 'dtable': + db_name = SEATABLE_MYSQL_DB_DTABLE_DB_NAME + elif db == 'seafile': + db_name = SEATABLE_MYSQL_DB_SEAFILE_DB_NAME + elif db == 'ccnet': + db_name = SEATABLE_MYSQL_DB_CCNET_DB_NAME - if config.has_option('DATABASE', 'operation_log_db_port'): - port = config.getint('DATABASE', 'operation_log_db_port') - else: - port = 3306 + db_host = SEATABLE_MYSQL_DB_HOST + db_port = SEATABLE_MYSQL_DB_PORT + db_user = SEATABLE_MYSQL_DB_USER + db_pwd = SEATABLE_MYSQL_DB_PASSWORD - username = config.get('DATABASE', 'operation_log_db_username') - password = config.get('DATABASE', 'operation_log_db_password') - db_name = config.get('DATABASE', 'operation_log_db_name') + if not (db_name and db_host and db_port and db_user): + raise RuntimeError('Database configured error') - db_url = "mysql+mysqldb://%s:%s@%s:%s/%s?charset=utf8" % \ - (username, quote_plus(password), host, port, db_name) - logger.debug('[dtable_events] database: mysql, name: %s', db_name) - else: - logger.error("Unknown database backend: %s" % backend) - raise RuntimeError("Unknown database backend: %s" % backend) + db_url = "mysql+mysqldb://%s:%s@%s:%s/%s?charset=utf8" % \ + (db_user, quote_plus(db_pwd), db_host, db_port, db_name) + logger.debug('[dtable_events] database: mysql, name: %s', db_name) # Add pool recycle, or mysql connection will be closed # by mysql daemon if idle for too long. @@ -157,10 +57,10 @@ def create_operation_log_db_engine_from_conf(config): return engine -def init_db_session_class(config): +def init_db_session_class(): """Configure session class for mysql according to the config file.""" try: - engine = create_engine_from_conf(config) + engine = create_engine_from_conf('dtable') except (configparser.NoOptionError, configparser.NoSectionError) as e: logger.error("Init db session class error: %s" % e) raise RuntimeError("Init db session class error: %s" % e) @@ -169,10 +69,10 @@ def init_db_session_class(config): return session -def init_seafile_db_session_class(config): +def init_seafile_db_session_class(): """Configure session class for mysql according to the config file.""" try: - engine = create_seafile_engine_from_conf(config) + engine = create_engine_from_conf('seafile') except (configparser.NoOptionError, configparser.NoSectionError) as e: logger.error("Init operation-log db session class error: %s" % e) raise RuntimeError("Init operation-log db session class error: %s" % e) @@ -181,10 +81,10 @@ def init_seafile_db_session_class(config): return session -def create_db_tables(config): +def create_db_tables(): # create events tables if not exists. try: - engine = create_engine_from_conf(config) + engine = create_engine_from_conf('dtable') except (configparser.NoOptionError, configparser.NoSectionError) as e: logger.error("Create tables error: %s" % e) raise RuntimeError("Create tables error: %s" % e) @@ -193,10 +93,10 @@ def create_db_tables(config): engine.dispose() -def prepare_seafile_tables(seafile_config): +def prepare_seafile_tables(): # reflect the seafile_db tables try: - engine = create_seafile_engine_from_conf(seafile_config) + engine = create_engine_from_conf('seafile') except Exception as e: logger.error(e) raise RuntimeError("create db engine error: %s" % e) diff --git a/dtable_events/dtable_io/__init__.py b/dtable_events/dtable_io/__init__.py index fd7eb2df..41f39278 100644 --- a/dtable_events/dtable_io/__init__.py +++ b/dtable_events/dtable_io/__init__.py @@ -57,7 +57,7 @@ def add_task_id_to_log(log_msg, task_id=None): return f'task [{task_id}] - {log_msg}' if task_id else log_msg -def get_dtable_export_content(username, repo_id, workspace_id, dtable_uuid, asset_dir_id, ignore_archive_backup, config, task_id): +def get_dtable_export_content(username, repo_id, workspace_id, dtable_uuid, asset_dir_id, ignore_archive_backup, task_id): """ 1. prepare file content at /tmp/dtable-io//dtable_asset/... 2. make zip file @@ -73,7 +73,7 @@ def get_dtable_export_content(username, repo_id, workspace_id, dtable_uuid, asse 'dtable_asset/') # used to store asset files and json from file_server tmp_zip_path = os.path.join('/tmp/dtable-io', dtable_uuid, 'zip_file') + '.zip' # zip path of zipped xxx.dtable - db_session = init_db_session_class(config)() + db_session = init_db_session_class()() dtable_io_logger.info(add_task_id_to_log('Clear tmp dirs and files before prepare.', task_id)) clear_tmp_files_and_dirs(tmp_file_path, tmp_zip_path) @@ -206,7 +206,7 @@ def get_dtable_export_content(username, repo_id, workspace_id, dtable_uuid, asse return task_result -def get_dtable_export_content_folder(username, repo_id, workspace_id, dtable_uuid, asset_dir_id, ignore_archive_backup, config, folder_path, task_id): +def get_dtable_export_content_folder(username, repo_id, workspace_id, dtable_uuid, asset_dir_id, ignore_archive_backup, folder_path, task_id): """ like `get_dtable_export_content` but to export to `folder_path` and not to archive """ @@ -214,7 +214,7 @@ def get_dtable_export_content_folder(username, repo_id, workspace_id, dtable_uui 'warnings': [] } - db_session = init_db_session_class(config)() + db_session = init_db_session_class()() # if not os.path.isdir(folder_path): if not os.path.exists(folder_path): @@ -295,7 +295,7 @@ def get_dtable_export_content_folder(username, repo_id, workspace_id, dtable_uui def post_dtable_import_files(username, repo_id, workspace_id, dtable_uuid, dtable_file_name, in_storage, - can_use_automation_rules, can_use_workflows, can_use_external_apps, can_import_archive, owner, org_id, config, task_id): + can_use_automation_rules, can_use_workflows, can_use_external_apps, can_import_archive, owner, org_id, task_id): """ post files at /tmp//dtable_zip_extracted/ to file server unzip django uploaded tmp file is suppose to be done in dtable-web api. @@ -304,7 +304,7 @@ def post_dtable_import_files(username, repo_id, workspace_id, dtable_uuid, dtabl extracted_path = os.path.join('/tmp/dtable-io', dtable_uuid, 'dtable_zip_extracted') - db_session = init_db_session_class(config)() + db_session = init_db_session_class()() dtable_io_logger.info(add_task_id_to_log('Prepare dtable json file and post it at file server.', task_id)) try: @@ -397,14 +397,14 @@ def post_dtable_import_files(username, repo_id, workspace_id, dtable_uuid, dtabl def post_dtable_import_files_folder(username, repo_id, workspace_id, dtable_uuid, folder_path, in_storage, - can_use_automation_rules, can_use_workflows, can_use_external_apps, can_import_archive, owner, org_id, config, task_id): + can_use_automation_rules, can_use_workflows, can_use_external_apps, can_import_archive, owner, org_id, task_id): """ like `post_dtable_import_files` but import from `folder_path` and not to remove folder_path """ dtable_io_logger.info(add_task_id_to_log(f'Start import DTable: {dtable_uuid}.', task_id)) dtable_file_name = os.path.basename(folder_path) - db_session = init_db_session_class(config)() + db_session = init_db_session_class()() dtable_io_logger.info(add_task_id_to_log('Prepare dtable json file and post it at file server.', task_id)) try: @@ -477,7 +477,7 @@ def post_dtable_import_files_folder(username, repo_id, workspace_id, dtable_uuid dtable_io_logger.info(add_task_id_to_log(f'Import DTable: {dtable_uuid} success!', task_id)) -def get_dtable_export_asset_files(username, repo_id, dtable_uuid, files, task_id, config, files_map=None): +def get_dtable_export_asset_files(username, repo_id, dtable_uuid, files, task_id, files_map=None): """ export asset files from dtable """ @@ -500,7 +500,7 @@ def get_dtable_export_asset_files(username, repo_id, dtable_uuid, files, task_id clear_tmp_files_and_dirs(tmp_file_path, tmp_zip_path) os.makedirs(tmp_file_path, exist_ok=True) - db_session = init_db_session_class(config)() + db_session = init_db_session_class()() try: # 1. download files to tmp_file_path download_files_to_path(username, repo_id, dtable_uuid, files, tmp_file_path, db_session, files_map) @@ -547,7 +547,7 @@ def import_big_data_screen(username, repo_id, dtable_uuid, page_id): dtable_io_logger.error('rm extracted tmp file failed. ERROR: {}'.format(e)) -def get_dtable_export_big_data_screen_app(username, repo_id, dtable_uuid, app_uuid, app_id, task_id, config): +def get_dtable_export_big_data_screen_app(username, repo_id, dtable_uuid, app_uuid, app_id, task_id): """ parse json file in big data screen, and zip it for download """ @@ -556,7 +556,7 @@ def get_dtable_export_big_data_screen_app(username, repo_id, dtable_uuid, app_uu clear_tmp_files_and_dirs(tmp_file_path, tmp_zip_path) os.makedirs(tmp_file_path.rstrip('/') + '/images', exist_ok=True) - db_session = init_db_session_class(config)() + db_session = init_db_session_class()() try: zip_big_data_screen_app(username, repo_id, dtable_uuid, app_uuid, app_id, tmp_file_path, db_session) @@ -569,12 +569,12 @@ def get_dtable_export_big_data_screen_app(username, repo_id, dtable_uuid, app_uu db_session.close() -def import_big_data_screen_app(username, repo_id, dtable_uuid, app_uuid, app_id, config): +def import_big_data_screen_app(username, repo_id, dtable_uuid, app_uuid, app_id): """ parse the zip in tmp folders and upload it """ tmp_extracted_path = os.path.join('/tmp/dtable-io', dtable_uuid, 'big_data_screen_zip_extracted', app_uuid) - db_session = init_db_session_class(config)() + db_session = init_db_session_class()() try: post_big_data_screen_app_zip_file(username, repo_id, dtable_uuid, app_uuid, app_id, tmp_extracted_path, db_session) except Exception as e: @@ -588,7 +588,7 @@ def import_big_data_screen_app(username, repo_id, dtable_uuid, app_uuid, app_id, except Exception as e: dtable_io_logger.error('rm extracted tmp file failed. ERROR: {}'.format(e)) -def parse_excel_csv(username, repo_id, file_name, file_type, parse_type, dtable_uuid, config): +def parse_excel_csv(username, repo_id, file_name, file_type, parse_type, dtable_uuid): """ parse excel or csv to json file, then upload json file to file server """ @@ -602,7 +602,7 @@ def parse_excel_csv(username, repo_id, file_name, file_type, parse_type, dtable_ else: dtable_io_logger.info('parse excel %s.xlsx success!' % file_name) -def import_excel_csv(username, repo_id, dtable_uuid, dtable_name, included_tables, lang, config): +def import_excel_csv(username, repo_id, dtable_uuid, dtable_name, included_tables, lang): """ upload excel or csv json file to dtable-server """ @@ -616,7 +616,7 @@ def import_excel_csv(username, repo_id, dtable_uuid, dtable_name, included_table else: dtable_io_logger.info('import excel or csv %s success!' % dtable_name) -def import_excel_csv_add_table(username, dtable_uuid, dtable_name, included_tables, lang, config): +def import_excel_csv_add_table(username, dtable_uuid, dtable_name, included_tables, lang): """ add table, upload excel or csv json file to dtable-server """ @@ -787,11 +787,11 @@ def _upload_to_seafile(seafile_server_url, access_token, files, parent_dir="/", response = requests.post(upload_url, files=files) return response -def get_dtable_transfer_asset_files(username, repo_id, dtable_uuid, files, task_id, files_map, parent_dir, relative_path, replace, repo_api_token, seafile_server_url, config): +def get_dtable_transfer_asset_files(username, repo_id, dtable_uuid, files, task_id, files_map, parent_dir, relative_path, replace, repo_api_token, seafile_server_url): tmp_file_path = os.path.join('/tmp/dtable-io/', dtable_uuid, 'transfer-files', str(task_id)) os.makedirs(tmp_file_path, exist_ok=True) - db_session = init_db_session_class(config)() + db_session = init_db_session_class()() try: # download files to local local_file_list = download_files_to_path(username, repo_id, dtable_uuid, files, tmp_file_path, db_session, files_map) @@ -905,10 +905,10 @@ def send_notification_msg(emails, user_col_key, msg, dtable_uuid, username, tabl dtable_message_logger.info('Notification sending success!') return result -def convert_page_design_to_pdf(dtable_uuid, page_id, row_id, username, config): +def convert_page_design_to_pdf(dtable_uuid, page_id, row_id, username): if not username: username = 'dtable-events' - db_session = init_db_session_class(config)() + db_session = init_db_session_class()() sql = "SELECT `owner`, `org_id` FROM dtables d JOIN workspaces w ON d.workspace_id=w.id WHERE d.uuid=:dtable_uuid" try: result = db_session.execute(text(sql), {'dtable_uuid': uuid_str_to_32_chars(dtable_uuid)}).fetchone() @@ -932,10 +932,10 @@ def convert_page_design_to_pdf(dtable_uuid, page_id, row_id, username, config): dtable_io_logger.exception('dtable: %s plugin: page-design page: %s row: %s error: %s', dtable_uuid, page_id, row_id, e) -def convert_document_to_pdf(dtable_uuid, doc_uuid, row_id, username, config): +def convert_document_to_pdf(dtable_uuid, doc_uuid, row_id, username): if not username: username = 'dtable-events' - db_session = init_db_session_class(config)() + db_session = init_db_session_class()() sql = "SELECT `owner`, `org_id` FROM dtables d JOIN workspaces w ON d.workspace_id=w.id WHERE d.uuid=:dtable_uuid" try: result = db_session.execute(text(sql), {'dtable_uuid': uuid_str_to_32_chars(dtable_uuid)}).fetchone() @@ -1121,9 +1121,9 @@ def convert_table_to_excel(dtable_uuid, table_id, username, name, repo_id, is_su except: pass -def app_user_sync(dtable_uuid, app_name, app_id, table_name, table_id, username, config): +def app_user_sync(dtable_uuid, app_name, app_id, table_name, table_id, username): dtable_io_logger.info('Start sync app %s users: to table %s.' % (app_name, table_name)) - db_session = init_db_session_class(config)() + db_session = init_db_session_class()() try: sync_app_users_to_table(dtable_uuid, app_id, table_name, table_id, username, db_session) except Exception as e: @@ -1136,9 +1136,9 @@ def app_user_sync(dtable_uuid, app_name, app_id, table_name, table_id, username, db_session.close() -def email_sync(context, config): +def email_sync(context): dtable_data_sync_logger.info('Start sync email to dtable %s, email table %s.' % (context.get('dtable_uuid'), context.get('detail',{}).get('email_table_id'))) - context['db_session_class'] = init_db_session_class(config) + context['db_session_class'] = init_db_session_class() try: run_sync_emails(context) @@ -1148,7 +1148,7 @@ def email_sync(context, config): dtable_data_sync_logger.info('sync email success, sync_id: %s' % context.get('data_sync_id')) -def plugin_email_send_email(context, config=None): +def plugin_email_send_email(context): dtable_plugin_email_logger.info('Start send email by plugin %s, email table %s.' % (context.get('dtable_uuid'), context.get('table_info', {}).get('email_table_name'))) dtable_uuid = context.get('dtable_uuid') @@ -1166,7 +1166,7 @@ def plugin_email_send_email(context, config=None): thread_table_name = table_info.get('thread_table_name') # send email - toggle_send_email(account_id, email_info, username, config) + toggle_send_email(account_id, email_info, username) send_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') diff --git a/dtable_events/dtable_io/dtable_io_server.py b/dtable_events/dtable_io/dtable_io_server.py index ef20d72a..6ca43715 100644 --- a/dtable_events/dtable_io/dtable_io_server.py +++ b/dtable_events/dtable_io/dtable_io_server.py @@ -1,6 +1,8 @@ from threading import Thread from waitress import serve +from dtable_events.app.config import IO_SERVER_HOST, IO_SERVER_PORT, IO_SERVER_TASK_TIMEOUT, \ + IO_SERVER_WORKERS from dtable_events.dtable_io.request_handler import app as application from dtable_events.dtable_io.task_big_data_manager import big_data_task_manager from dtable_events.dtable_io.task_manager import task_manager @@ -11,15 +13,15 @@ class DTableIOServer(Thread): - def __init__(self, app, config): + def __init__(self, app): Thread.__init__(self) - self._parse_config(config) + self._parse_config() self.app = app - task_manager.init(self.app, self._workers, self._file_server_port, self._io_task_timeout, config) - message_task_manager.init(self._workers, self._file_server_port, self._io_task_timeout, config) - data_sync_task_manager.init(self._workers, self._file_server_port, self._io_task_timeout, config) - plugin_email_task_manager.init(self._workers, self._file_server_port, self._io_task_timeout, config) - big_data_task_manager.init(self._workers, self._file_server_port, self._io_task_timeout, config) + task_manager.init(self.app, self._workers, self._io_task_timeout) + message_task_manager.init(self._workers, self._io_task_timeout) + data_sync_task_manager.init(self._workers, self._io_task_timeout) + plugin_email_task_manager.init(self._workers, self._io_task_timeout) + big_data_task_manager.init(self._workers, self._io_task_timeout) task_manager.run() message_task_manager.run() @@ -27,31 +29,11 @@ def __init__(self, app, config): plugin_email_task_manager.run() big_data_task_manager.run() - def _parse_config(self, config): - self._host = '127.0.0.1' - self._port = '6000' - self._workers = 3 - self._io_task_timeout = 3600 - self._file_server_port = 8082 - - section_name = 'DTABLE IO' - if not config.has_section(section_name): - section_name = 'DTABLE-IO' - - if config.has_option(section_name, 'host'): - self._host = config.get(section_name, 'host') - if config.has_option(section_name, 'port'): - self._port = config.getint(section_name, 'port') - - if config.has_option(section_name, 'workers'): - self._workers = config.getint(section_name, 'workers') - - if config.has_option(section_name, 'io_task_timeout'): - self._io_task_timeout = config.getint(section_name, 'io_task_timeout') - - if config.has_option(section_name, 'file_server_port'): - self._file_server_port = config.getint(section_name, 'file_server_port') - + def _parse_config(self): + self._host = IO_SERVER_HOST + self._port = IO_SERVER_PORT + self._workers = IO_SERVER_WORKERS + self._io_task_timeout = IO_SERVER_TASK_TIMEOUT def run(self): serve(application, host=self._host, port=int(self._port)) diff --git a/dtable_events/dtable_io/excel.py b/dtable_events/dtable_io/excel.py index 817d5768..a9896a99 100644 --- a/dtable_events/dtable_io/excel.py +++ b/dtable_events/dtable_io/excel.py @@ -9,7 +9,7 @@ from openpyxl import load_workbook from copy import deepcopy from datetime import datetime, time -from dtable_events.app.config import EXPORT2EXCEL_DEFAULT_STRING, TIME_ZONE, INNER_DTABLE_DB_URL, INNER_DTABLE_SERVER_URL +from dtable_events.app.config import TIME_ZONE, INNER_DTABLE_DB_URL, INNER_DTABLE_SERVER_URL from dtable_events.utils import utc_to_tz, gen_random_option, format_date_in_query from dtable_events.utils.constants import ColumnTypes, FormulaResultType from dtable_events.utils.geo_location_parser import parse_geolocation_from_tree @@ -1810,7 +1810,7 @@ def write_xls_with_type(data_list, email2nickname, ws, row_num, dtable_uuid, rep if not column_error_log_exists: dtable_io_logger.error('Error column in exporting excel: {}'.format(e)) column_error_log_exists = True - c = WriteOnlyCell(ws, value=EXPORT2EXCEL_DEFAULT_STRING) + c = WriteOnlyCell(ws, value='illegal character in excel') head_cell_list.append(c) col_num += 1 ws.append(head_cell_list) diff --git a/dtable_events/dtable_io/import_sync_common_dataset.py b/dtable_events/dtable_io/import_sync_common_dataset.py index 4e86128c..6468c0fb 100644 --- a/dtable_events/dtable_io/import_sync_common_dataset.py +++ b/dtable_events/dtable_io/import_sync_common_dataset.py @@ -10,14 +10,14 @@ from dtable_events.dtable_io.task_manager import task_manager from dtable_events.app.config import INNER_DTABLE_SERVER_URL -def force_sync_common_dataset(context: dict, config): +def force_sync_common_dataset(context: dict): """ force apply common dataset to all syncs """ dataset_id = context.get('dataset_id') dst_dtable_uuids = context.get('dst_dtable_uuids') # select valid syncs - session_class = init_db_session_class(config) + session_class = init_db_session_class() sql = ''' SELECT dcds.dst_dtable_uuid, dcds.dst_table_id, dcd.table_id AS src_table_id, dcd.view_id AS src_view_id, dcd.dtable_uuid AS src_dtable_uuid, dcds.id AS sync_id, dcds.src_version, dcd.id AS dataset_id @@ -45,7 +45,7 @@ def force_sync_common_dataset(context: dict, config): task_manager.finish_dataset_sync(sync_item.sync_id) -def sync_common_dataset(context, config): +def sync_common_dataset(context): """ sync common dataset to destination table @@ -78,7 +78,7 @@ def sync_common_dataset(context, config): # get database version try: - db_session = init_db_session_class(config)() + db_session = init_db_session_class()() except Exception as e: cds_logger.error('create db session failed. ERROR: {}'.format(e)) return @@ -182,7 +182,7 @@ def sync_common_dataset(context, config): db_session.close() -def import_common_dataset(context, config): +def import_common_dataset(context): """ import common dataset to destination table """ @@ -202,7 +202,7 @@ def import_common_dataset(context, config): org_id = context.get('org_id') try: - db_session = init_db_session_class(config)() + db_session = init_db_session_class()() except Exception as e: db_session = None cds_logger.error('create db session failed. ERROR: {}'.format(e)) diff --git a/dtable_events/dtable_io/task_big_data_manager.py b/dtable_events/dtable_io/task_big_data_manager.py index c9452800..8a9a87fd 100644 --- a/dtable_events/dtable_io/task_big_data_manager.py +++ b/dtable_events/dtable_io/task_big_data_manager.py @@ -12,19 +12,15 @@ def __init__(self): self.tasks_status_map = {} self.tasks_queue = queue.Queue(10) self.conf = None - self.config = None self.current_task_info = None self.t = None self.threads = [] self.conf = {} - def init(self, workers, file_server_port, io_task_timeout, config): - self.conf['file_server_port'] = file_server_port + def init(self, workers, io_task_timeout): self.conf['io_task_timeout'] = io_task_timeout self.conf['workers'] = workers - self.config = config - def is_valid_task_id(self, task_id): return task_id in self.tasks_map.keys() diff --git a/dtable_events/dtable_io/task_data_sync_manager.py b/dtable_events/dtable_io/task_data_sync_manager.py index d77ce8d2..f78b945c 100644 --- a/dtable_events/dtable_io/task_data_sync_manager.py +++ b/dtable_events/dtable_io/task_data_sync_manager.py @@ -8,17 +8,13 @@ class TaskDataSyncManager(object): def __init__(self): self.tasks_map = {} self.tasks_queue = queue.Queue(10) - self.config = None self.current_task_info = {} self.conf = {} - def init(self, workers, file_server_port, io_task_timeout, config): - self.conf['file_server_port'] = file_server_port + def init(self, workers, io_task_timeout): self.conf['io_task_timeout'] = io_task_timeout self.conf['workers'] = workers - self.config = config - def is_valid_task_id(self, task_id): return task_id in self.tasks_map.keys() @@ -26,7 +22,7 @@ def add_sync_email_task(self, context): from dtable_events.dtable_io import email_sync task_id = str(uuid.uuid4()) - task = (email_sync, (context, self.config)) + task = (email_sync, (context,)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task diff --git a/dtable_events/dtable_io/task_manager.py b/dtable_events/dtable_io/task_manager.py index 03c0dc07..0947e9b2 100644 --- a/dtable_events/dtable_io/task_manager.py +++ b/dtable_events/dtable_io/task_manager.py @@ -42,7 +42,6 @@ def __init__(self): self.tasks_map = {} self.task_results_map = {} self.tasks_queue = queue.Queue(10) - self.config = None self.current_task_info = {} self.threads = [] @@ -54,14 +53,11 @@ def __init__(self): self.conf = {} - def init(self, app, workers, file_server_port, io_task_timeout, config): + def init(self, app, workers, io_task_timeout): self.app = app - self.conf['file_server_port'] = file_server_port self.conf['io_task_timeout'] = io_task_timeout self.conf['workers'] = workers - self.config = config - def is_valid_task_id(self, task_id): return task_id in (self.tasks_map.keys() | self.task_results_map.keys()) @@ -77,10 +73,10 @@ def add_export_task(self, username, repo_id, workspace_id, dtable_uuid, dtable_n task_id = str(uuid.uuid4()) if not is_export_folder: task = (get_dtable_export_content, - (username, repo_id, workspace_id, dtable_uuid, asset_dir_id, ignore_archive_backup, self.config, task_id)) + (username, repo_id, workspace_id, dtable_uuid, asset_dir_id, ignore_archive_backup, task_id)) else: task = (get_dtable_export_content_folder, - (username, repo_id, workspace_id, dtable_uuid, asset_dir_id, ignore_archive_backup, self.config, folder_path, task_id)) + (username, repo_id, workspace_id, dtable_uuid, asset_dir_id, ignore_archive_backup, folder_path, task_id)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), metric_name='io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) @@ -97,11 +93,11 @@ def add_import_task(self, username, repo_id, workspace_id, dtable_uuid, dtable_f if not is_import_folder: task = (post_dtable_import_files, (username, repo_id, workspace_id, dtable_uuid, dtable_file_name, in_storage, - can_use_automation_rules, can_use_workflows, can_use_external_apps, can_import_archive, owner, org_id, self.config, task_id)) + can_use_automation_rules, can_use_workflows, can_use_external_apps, can_import_archive, owner, org_id, task_id)) else: task = (post_dtable_import_files_folder, (username, repo_id, workspace_id, dtable_uuid, folder_path, in_storage, - can_use_automation_rules, can_use_workflows, can_use_external_apps, can_import_archive, owner, org_id, self.config, task_id)) + can_use_automation_rules, can_use_workflows, can_use_external_apps, can_import_archive, owner, org_id, task_id)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', TASK_MANAGER_METRIC_HELP) @@ -113,7 +109,7 @@ def add_export_dtable_asset_files_task(self, username, repo_id, dtable_uuid, fil task_id = str(uuid.uuid4()) task = (get_dtable_export_asset_files, - (username, repo_id, dtable_uuid, files, task_id, self.config, files_map)) + (username, repo_id, dtable_uuid, files, task_id, files_map)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) @@ -146,7 +142,7 @@ def add_export_dtable_big_data_screen_app_task(self, username, repo_id, dtable_u from dtable_events.dtable_io import get_dtable_export_big_data_screen_app task_id = str(uuid.uuid4()) task = (get_dtable_export_big_data_screen_app, - (username, repo_id, dtable_uuid, app_uuid, app_id, task_id, self.config)) + (username, repo_id, dtable_uuid, app_uuid, app_id, task_id)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) @@ -157,7 +153,7 @@ def add_import_dtable_big_data_screen_app_task(self, username, repo_id, dtable_u from dtable_events.dtable_io import import_big_data_screen_app task_id = str(uuid.uuid4()) task = (import_big_data_screen_app, - (username, repo_id, dtable_uuid, app_uuid, app_id, self.config)) + (username, repo_id, dtable_uuid, app_uuid, app_id)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) @@ -178,8 +174,7 @@ def add_transfer_dtable_asset_files_task(self, username, repo_id, dtable_uuid, f relative_path, replace, repo_api_token, - seafile_server_url, - self.config)) + seafile_server_url)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) @@ -191,7 +186,7 @@ def add_parse_excel_csv_task(self, username, repo_id, file_name, file_type, pars task_id = str(uuid.uuid4()) task = (parse_excel_csv, - (username, repo_id, file_name, file_type, parse_type, dtable_uuid, self.config)) + (username, repo_id, file_name, file_type, parse_type, dtable_uuid)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) @@ -203,7 +198,7 @@ def add_import_excel_csv_task(self, username, repo_id, dtable_uuid, dtable_name, task_id = str(uuid.uuid4()) task = (import_excel_csv, - (username, repo_id, dtable_uuid, dtable_name, included_tables, lang, self.config)) + (username, repo_id, dtable_uuid, dtable_name, included_tables, lang)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) @@ -215,7 +210,7 @@ def add_import_excel_csv_add_table_task(self, username, dtable_uuid, dtable_name task_id = str(uuid.uuid4()) task = (import_excel_csv_add_table, - (username, dtable_uuid, dtable_name, included_tables, lang, self.config)) + (username, dtable_uuid, dtable_name, included_tables, lang)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) @@ -258,7 +253,7 @@ def add_run_auto_rule_task(self, automation_rule_id, creator, owner, org_id, dta 'rule_id': automation_rule_id } - task = (run_auto_rule_task, (trigger, actions, options, self.config)) + task = (run_auto_rule_task, (trigger, actions, options)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) @@ -356,7 +351,7 @@ def convert_page_design_to_pdf(self, dtable_uuid, page_id, row_id, username): task_id = str(uuid.uuid4()) task = (convert_page_design_to_pdf, - (dtable_uuid, page_id, row_id, username, self.config)) + (dtable_uuid, page_id, row_id, username)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) @@ -368,7 +363,7 @@ def convert_document_to_pdf(self, dtable_uuid, doc_uuid, row_id, username): task_id = str(uuid.uuid4()) task = (convert_document_to_pdf, - (dtable_uuid, doc_uuid, row_id, username, self.config)) + (dtable_uuid, doc_uuid, row_id, username)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) @@ -393,7 +388,7 @@ def add_import_common_dataset_task(self, context): task_id = str(uuid.uuid4()) context['app'] = self.app - task = (import_common_dataset, (context, self.config)) + task = (import_common_dataset, (context,)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) @@ -415,7 +410,7 @@ def add_sync_common_dataset_task(self, context): task_id = str(uuid.uuid4()) context['app'] = self.app - task = (sync_common_dataset, (context, self.config)) + task = (sync_common_dataset, (context,)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) @@ -437,7 +432,7 @@ def add_force_sync_common_dataset_task(self, context): task_id = str(uuid.uuid4()) context['app'] = self.app - task = (force_sync_common_dataset, (context, self.config)) + task = (force_sync_common_dataset, (context,)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) @@ -472,7 +467,7 @@ def add_convert_table_to_excel_task(self, dtable_uuid, table_id, username, name, def add_app_users_sync_task(self, dtable_uuid, app_name, app_id, table_name, table_id, username): from dtable_events.dtable_io import app_user_sync task_id = str(uuid.uuid4()) - task = (app_user_sync, (dtable_uuid, app_name, app_id, table_name, table_id, username, self.config)) + task = (app_user_sync, (dtable_uuid, app_name, app_id, table_name, table_id, username)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), 'io_task_queue_size', metric_help=TASK_MANAGER_METRIC_HELP) diff --git a/dtable_events/dtable_io/task_message_manager.py b/dtable_events/dtable_io/task_message_manager.py index 73570926..8dcde53e 100644 --- a/dtable_events/dtable_io/task_message_manager.py +++ b/dtable_events/dtable_io/task_message_manager.py @@ -14,25 +14,21 @@ def __init__(self): self.tasks_map = {} self.tasks_result_map = {} self.tasks_queue = queue.Queue(10) - self.config = None self.current_task_info = None self.t = None self.conf = {} - def init(self, workers, file_server_port, io_task_timeout, config): - self.conf['file_server_port'] = file_server_port + def init(self, workers, io_task_timeout): self.conf['io_task_timeout'] = io_task_timeout self.conf['workers'] = workers - self.config = config - def is_valid_task_id(self, task_id): return task_id in self.tasks_map.keys() def add_email_sending_task(self, account_id, send_info, username): from dtable_events.utils.email_sender import toggle_send_email task_id = str(uuid.uuid4()) - task = (toggle_send_email, (account_id, send_info, username, self.config)) + task = (toggle_send_email, (account_id, send_info, username)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task publish_metric(self.tasks_queue.qsize(), metric_name='message_io_task_queue_size', metric_help=MESSAGE_TASK_MANAGER_METRIC_HELP) diff --git a/dtable_events/dtable_io/task_plugin_email_manager.py b/dtable_events/dtable_io/task_plugin_email_manager.py index 942dd2e8..744240e4 100644 --- a/dtable_events/dtable_io/task_plugin_email_manager.py +++ b/dtable_events/dtable_io/task_plugin_email_manager.py @@ -12,17 +12,13 @@ def __init__(self): self.tasks_map = {} self.tasks_result_map = {} self.tasks_queue = queue.Queue(10) - self.config = None self.current_task_info = {} self.conf = {} - def init(self, workers, file_server_port, io_task_timeout, config): - self.conf['file_server_port'] = file_server_port + def init(self, workers, io_task_timeout): self.conf['io_task_timeout'] = io_task_timeout self.conf['workers'] = workers - self.config = config - def is_valid_task_id(self, task_id): return task_id in self.tasks_map.keys() @@ -30,7 +26,7 @@ def add_send_email_task(self, context): from dtable_events.dtable_io import plugin_email_send_email task_id = str(uuid.uuid4()) - task = (plugin_email_send_email, (context, self.config)) + task = (plugin_email_send_email, (context,)) self.tasks_queue.put(task_id) self.tasks_map[task_id] = task diff --git a/dtable_events/main.py b/dtable_events/main.py index b59d7c29..8ab96448 100644 --- a/dtable_events/main.py +++ b/dtable_events/main.py @@ -11,42 +11,26 @@ def main(): - args = parser.parse_args() - app_logger = LogConfigurator(args.loglevel, args.logfile) + args, _ = parser.parse_known_args() + LogConfigurator() - config = get_config(args.config_file) - - redis_cache.init_redis(config) # init redis instance for redis_cache - - seafile_conf_path = '/opt/seatable/conf/seafile.conf' - for conf_dir in [ - os.environ.get('SEAFILE_CENTRAL_CONF_DIR'), - os.environ.get('SEAFILE_DATA_DIR') - ]: - if os.path.isfile(os.path.join(conf_dir, 'seafile.conf')): - seafile_conf_path = os.path.join(conf_dir, 'seafile.conf') - break - - seafile_config = get_config(seafile_conf_path) + redis_cache.init_redis() # init redis instance for redis_cache try: - create_db_tables(config) - prepare_seafile_tables(seafile_config) + create_db_tables() + prepare_seafile_tables() except Exception as e: logging.error('Failed create or prepare tables, error: %s' % e) raise RuntimeError('Failed create or prepare tables, error: %s' % e) task_mode = get_task_mode(args.taskmode) - app = App(config, seafile_config, task_mode) + app = App(task_mode) app.serve_forever() if __name__ == '__main__': parser = argparse.ArgumentParser() - parser.add_argument('--config-file', help='config file') - parser.add_argument('--logfile', help='log file') - parser.add_argument('--loglevel', default='info', help='log level') parser.add_argument('--taskmode', default='all', help='task mode') main() diff --git a/dtable_events/notification_rules/dtable_notification_rules_scanner.py b/dtable_events/notification_rules/dtable_notification_rules_scanner.py index 142a5744..0472e965 100644 --- a/dtable_events/notification_rules/dtable_notification_rules_scanner.py +++ b/dtable_events/notification_rules/dtable_notification_rules_scanner.py @@ -6,10 +6,9 @@ from sqlalchemy import text from apscheduler.schedulers.blocking import BlockingScheduler -from dtable_events.app.config import TIME_ZONE +from dtable_events.app.config import TIME_ZONE, NOTIFICATION_RULES_SCAN_ENABLED from dtable_events.db import init_db_session_class from dtable_events.notification_rules.notification_rules_utils import trigger_near_deadline_notification_rule -from dtable_events.utils import get_opt_from_conf_or_env, parse_bool timezone = TIME_ZONE @@ -22,32 +21,19 @@ class DTableNofiticationRulesScanner(object): - def __init__(self, config): + def __init__(self): self._enabled = True self._logfile = None - self._parse_config(config) + self._parse_config() self._prepare_logfile() - self._db_session_class = init_db_session_class(config) + self._db_session_class = init_db_session_class() def _prepare_logfile(self): logdir = os.path.join(os.environ.get('LOG_DIR', '')) self._logfile = os.path.join(logdir, 'dtables_notification_rule_scanner.log') - def _parse_config(self, config): - """parse send email related options from config file - """ - section_name = 'NOTIFY SCANNER' - key_enabled = 'enabled' - - if not config.has_section(section_name): - section_name = 'NOTIFY-SCANNER' - if not config.has_section(section_name): - return - - # enabled - enabled = get_opt_from_conf_or_env(config, section_name, key_enabled, default=True) - enabled = parse_bool(enabled) - self._enabled = enabled + def _parse_config(self): + self._enabled = NOTIFICATION_RULES_SCAN_ENABLED def start(self): if not self.is_enabled(): diff --git a/dtable_events/notification_rules/handler.py b/dtable_events/notification_rules/handler.py index 4d9ef52b..c0a529ed 100644 --- a/dtable_events/notification_rules/handler.py +++ b/dtable_events/notification_rules/handler.py @@ -11,11 +11,11 @@ class NotificationRuleHandler(Thread): - def __init__(self, config): + def __init__(self): Thread.__init__(self) self._finished = Event() - self._db_session_class = init_db_session_class(config) - self._redis_client = RedisClient(config) + self._db_session_class = init_db_session_class() + self._redis_client = RedisClient() def run(self): logger.info('Starting handle notification rules...') diff --git a/dtable_events/statistics/counter.py b/dtable_events/statistics/counter.py index eec662ab..890cb796 100644 --- a/dtable_events/statistics/counter.py +++ b/dtable_events/statistics/counter.py @@ -12,11 +12,11 @@ class UserActivityCounter(Thread): - def __init__(self, config): + def __init__(self): Thread.__init__(self) self._finished = Event() - self._db_session_class = init_db_session_class(config) - self._redis_client = RedisClient(config) + self._db_session_class = init_db_session_class() + self._redis_client = RedisClient() def run(self): logger.info('Starting count user activity...') diff --git a/dtable_events/tasks/ai_stats_worker.py b/dtable_events/tasks/ai_stats_worker.py index 15eabf72..c7a6df71 100644 --- a/dtable_events/tasks/ai_stats_worker.py +++ b/dtable_events/tasks/ai_stats_worker.py @@ -10,7 +10,7 @@ from dateutil import relativedelta from sqlalchemy import text -from dtable_events.app.config import AI_PRICES, BAIDU_OCR_TOKENS +from dtable_events.app.config import AI_PRICES, BAIDU_OCR_TOKENS, AI_STATS_ENABLED from dtable_events.app.event_redis import RedisClient, redis_cache from dtable_events.db import init_db_session_class from dtable_events.utils import get_opt_from_conf_or_env, parse_bool, uuid_str_to_36_chars, uuid_str_to_32_chars @@ -20,26 +20,18 @@ class AIStatsWorker: - def __init__(self, config): - self._db_session_class = init_db_session_class(config) - self._redis_client = RedisClient(config) + def __init__(self): + self._db_session_class = init_db_session_class() + self._redis_client = RedisClient() self.stats_lock = Lock() self.channel = 'log_ai_model_usage' self.keep_months = 3 self.owner_info_cache_timeout = 24 * 60 * 60 - self._parse_config(config) + self._parse_config() self.reset_stats() - def _parse_config(self, config): - """parse send email related options from config file - """ - section_name = 'AI STATS' - key_enabled = 'enabled' - - # enabled - enabled = get_opt_from_conf_or_env(config, section_name, key_enabled, default=True) - enabled = parse_bool(enabled) - self._enabled = enabled + def _parse_config(self): + self._enabled = AI_STATS_ENABLED def reset_stats(self): self.org_stats = defaultdict(lambda: defaultdict(lambda: {'input_tokens': 0, 'output_tokens': 0})) diff --git a/dtable_events/tasks/big_data_storage_stats_worker.py b/dtable_events/tasks/big_data_storage_stats_worker.py index 0c38c158..b7487c33 100644 --- a/dtable_events/tasks/big_data_storage_stats_worker.py +++ b/dtable_events/tasks/big_data_storage_stats_worker.py @@ -1,16 +1,13 @@ # -*- coding: utf-8 -*- import os -import time import uuid import logging from threading import Thread -import jwt -import requests from apscheduler.schedulers.blocking import BlockingScheduler from sqlalchemy import text -from dtable_events.app.config import INNER_DTABLE_DB_URL, DTABLE_PRIVATE_KEY +from dtable_events.app.config import INNER_DTABLE_DB_URL from dtable_events.db import init_db_session_class from dtable_events.utils.dtable_db_api import DTableDBAPI @@ -51,9 +48,9 @@ def update_org_big_data_storage_stats(db_session): class BigDataStorageStatsWorker(object): - def __init__(self, config): + def __init__(self): self._logfile = None - self._db_session_class = init_db_session_class(config) + self._db_session_class = init_db_session_class() self._prepare_logfile() def _prepare_logfile(self): diff --git a/dtable_events/tasks/clean_db_records_worker.py b/dtable_events/tasks/clean_db_records_worker.py index ac180bd7..6552155f 100644 --- a/dtable_events/tasks/clean_db_records_worker.py +++ b/dtable_events/tasks/clean_db_records_worker.py @@ -7,7 +7,11 @@ from apscheduler.schedulers.blocking import BlockingScheduler from sqlalchemy import text -from dtable_events.app.config import ENABLE_OPERATION_LOG_DB +from dtable_events.app.config import ENABLE_OPERATION_LOG_DB, CLEAN_DB_ENABLED, CLEAN_DB_KEEP_DTABLE_SNAPSHOT_DAYS, \ + CLEAN_DB_KEEP_ACTIVITIES_DAYS, CLEAN_DB_KEEP_OPERATION_LOG_DAYS, CLEAN_DB_KEEP_DELETE_OPERATION_LOG_DAYS, \ + CLEAN_DB_KEEP_DTABLE_DB_OP_LOG_DAYS, CLEAN_DB_KEEP_NOTIFICATIONS_USERNOTIFICATION_DAYS, \ + CLEAN_DB_KEEP_DTABLE_NOTIFICATIONS_DAYS, CLEAN_DB_KEEP_SESSION_LOG_DAYS, CLEAN_DB_KEEP_AUTO_RULES_TASK_LOG_DAYS, \ + CLEAN_DB_KEEP_USER_ACTIVITY_STATISTICS_DAYS, CLEAN_DB_KEEP_DTABLE_APP_PAGES_OPERATION_LOG_DAYS from dtable_events.db import init_db_session_class __all__ = [ @@ -16,48 +20,28 @@ class CleanDBRecordsWorker(object): - def __init__(self, config): + def __init__(self): self._enabled = False - self._db_session_class = init_db_session_class(config) - - try: - self._parse_config(config) - except: - logging.exception('Could not parse config, using default retention config instead') - # Use default configuration - self._retention_config = RetentionConfig() - - def _parse_config(self, config): - section_name = 'CLEAN DB' - - self._enabled = config.getboolean(section_name, 'enabled', fallback=True) - - # Read retention times from config file - dtable_snapshot = config.getint(section_name, 'keep_dtable_snapshot_days', fallback=365) - activities = config.getint(section_name, 'keep_activities_days', fallback=30) - operation_log = config.getint(section_name, 'keep_operation_log_days', fallback=14) - delete_operation_log = config.getint(section_name, 'keep_delete_operation_log_days', fallback=30) - dtable_db_op_log = config.getint(section_name, 'keep_dtable_db_op_log_days', fallback=30) - notifications_usernotification = config.getint(section_name, 'keep_notifications_usernotification_days', fallback=30) - dtable_notifications = config.getint(section_name, 'keep_dtable_notifications_days', fallback=30) - session_log = config.getint(section_name, 'keep_session_log_days', fallback=30) - auto_rules_task_log = config.getint(section_name, 'keep_auto_rules_task_log_days', fallback=30) - # Disabled by default - user_activity_statistics = config.getint(section_name, 'keep_user_activity_statistics_days', fallback=0) - dtable_app_pages_operation_log = config.getint(section_name, 'keep_dtable_app_pages_operation_log_days', fallback=14) + self._db_session_class = init_db_session_class() + + self._parse_config() + + def _parse_config(self): + + self._enabled = CLEAN_DB_ENABLED self._retention_config = RetentionConfig( - dtable_snapshot=dtable_snapshot, - activities=activities, - operation_log=operation_log, - delete_operation_log=delete_operation_log, - dtable_db_op_log=dtable_db_op_log, - notifications_usernotification=notifications_usernotification, - dtable_notifications=dtable_notifications, - session_log=session_log, - auto_rules_task_log=auto_rules_task_log, - user_activity_statistics=user_activity_statistics, - dtable_app_pages_operation_log=dtable_app_pages_operation_log, + dtable_snapshot=CLEAN_DB_KEEP_DTABLE_SNAPSHOT_DAYS, + activities=CLEAN_DB_KEEP_ACTIVITIES_DAYS, + operation_log=CLEAN_DB_KEEP_OPERATION_LOG_DAYS, + delete_operation_log=CLEAN_DB_KEEP_DELETE_OPERATION_LOG_DAYS, + dtable_db_op_log=CLEAN_DB_KEEP_DTABLE_DB_OP_LOG_DAYS, + notifications_usernotification=CLEAN_DB_KEEP_NOTIFICATIONS_USERNOTIFICATION_DAYS, + dtable_notifications=CLEAN_DB_KEEP_DTABLE_NOTIFICATIONS_DAYS, + session_log=CLEAN_DB_KEEP_SESSION_LOG_DAYS, + auto_rules_task_log=CLEAN_DB_KEEP_AUTO_RULES_TASK_LOG_DAYS, + user_activity_statistics=CLEAN_DB_KEEP_USER_ACTIVITY_STATISTICS_DAYS, + dtable_app_pages_operation_log=CLEAN_DB_KEEP_DTABLE_APP_PAGES_OPERATION_LOG_DAYS, ) def start(self): diff --git a/dtable_events/tasks/dtable_asset_trash_cleaner.py b/dtable_events/tasks/dtable_asset_trash_cleaner.py index 733cbbaa..6e440207 100644 --- a/dtable_events/tasks/dtable_asset_trash_cleaner.py +++ b/dtable_events/tasks/dtable_asset_trash_cleaner.py @@ -18,9 +18,9 @@ class DTableAssetTrashCleaner(object): - def __init__(self, config): + def __init__(self): self._enabled = True - self._db_session_class = init_db_session_class(config) + self._db_session_class = init_db_session_class() self._expire_days = 60 def start(self): diff --git a/dtable_events/tasks/dtable_file_access_log_cleaner.py b/dtable_events/tasks/dtable_file_access_log_cleaner.py index 2a3d9873..bed77cc6 100644 --- a/dtable_events/tasks/dtable_file_access_log_cleaner.py +++ b/dtable_events/tasks/dtable_file_access_log_cleaner.py @@ -18,9 +18,9 @@ class DTableFileAccessLogCleaner(object): - def __init__(self, config): + def __init__(self): self._enabled = True - self._db_session_class = init_db_session_class(config) + self._db_session_class = init_db_session_class() self._expire_days = 60 def start(self): diff --git a/dtable_events/tasks/dtable_real_time_rows_counter.py b/dtable_events/tasks/dtable_real_time_rows_counter.py index fe4e7443..9ee61d73 100644 --- a/dtable_events/tasks/dtable_real_time_rows_counter.py +++ b/dtable_events/tasks/dtable_real_time_rows_counter.py @@ -114,11 +114,11 @@ def count_rows_by_uuids(session, dtable_uuids): class DTableRealTimeRowsCounter(Thread): - def __init__(self, config): + def __init__(self): Thread.__init__(self) self._finished = Event() - self._db_session_class = init_db_session_class(config) - self._redis_client = RedisClient(config) + self._db_session_class = init_db_session_class() + self._redis_client = RedisClient() def run(self): diff --git a/dtable_events/tasks/dtable_updates_sender.py b/dtable_events/tasks/dtable_updates_sender.py index 0d0860e8..27f69078 100644 --- a/dtable_events/tasks/dtable_updates_sender.py +++ b/dtable_events/tasks/dtable_updates_sender.py @@ -4,7 +4,7 @@ from threading import Thread, Event from dtable_events.utils import get_opt_from_conf_or_env, parse_bool, get_python_executable, run_and_wait -from dtable_events.app.config import dtable_web_dir +from dtable_events.app.config import dtable_web_dir, UPDATES_SENDER_ENABLED __all__ = [ @@ -14,30 +14,19 @@ class DTableUpdatesSender(object): - def __init__(self, config): + def __init__(self): self._enabled = True self._logfile = None self._interval = 60 * 60 self._prepare_logfile() - self._parse_config(config) + self._parse_config() def _prepare_logfile(self): log_dir = os.environ.get('LOG_DIR', '') self._logfile = os.path.join(log_dir, 'dtable_updates_sender.log') - def _parse_config(self, config): - """parse send email related options from config file - """ - section_name = 'EMAIL SENDER' - key_enabled = 'enabled' - - if not config.has_section(section_name): - return - - # enabled - enabled = get_opt_from_conf_or_env(config, section_name, key_enabled, default=True) - enabled = parse_bool(enabled) - self._enabled = enabled + def _parse_config(self): + self._enabled = UPDATES_SENDER_ENABLED def start(self): if not self.is_enabled(): diff --git a/dtable_events/tasks/dtables_cleaner.py b/dtable_events/tasks/dtables_cleaner.py index 80b78993..20547b71 100644 --- a/dtable_events/tasks/dtables_cleaner.py +++ b/dtable_events/tasks/dtables_cleaner.py @@ -2,7 +2,7 @@ import logging from threading import Thread, Event -from dtable_events.utils import get_opt_from_conf_or_env, get_python_executable, run +from dtable_events.utils import get_python_executable, run from dtable_events.app.config import dtable_web_dir, TRASH_CLEAN_AFTER_DAYS __all__ = [ @@ -12,18 +12,18 @@ class DTablesCleaner(object): - def __init__(self, config): + def __init__(self): self._enabled = True self._logfile = None self._interval = 60 * 60 * 24 self._prepare_logfile() - self._parse_config(config) + self._parse_config() def _prepare_logfile(self): logdir = os.path.join(os.environ.get('LOG_DIR', '')) self._logfile = os.path.join(logdir, 'dtables_cleaner.log') - def _parse_config(self, config): + def _parse_config(self): self._expire_seconds = 60 * 60 * 24 * TRASH_CLEAN_AFTER_DAYS def start(self): diff --git a/dtable_events/tasks/email_notices_sender.py b/dtable_events/tasks/email_notices_sender.py index fd4c59b2..91f31f5d 100644 --- a/dtable_events/tasks/email_notices_sender.py +++ b/dtable_events/tasks/email_notices_sender.py @@ -4,7 +4,7 @@ from threading import Thread, Event from dtable_events.utils import get_opt_from_conf_or_env, parse_bool, get_python_executable, run, parse_interval -from dtable_events.app.config import dtable_web_dir +from dtable_events.app.config import dtable_web_dir, EMAIL_SENDER_ENABLED, EMAIL_SENDER_INTERVAL __all__ = [ 'EmailNoticesSender', @@ -12,35 +12,22 @@ class EmailNoticesSender(object): - def __init__(self, config): + def __init__(self): self._enabled = True self._logfile = None self._interval = 60 * 60 # 60min self._prepare_logfile() - self._parse_config(config) + self._parse_config() def _prepare_logfile(self): logdir = os.path.join(os.environ.get('LOG_DIR', '')) self._logfile = os.path.join(logdir, 'email_notices_sender.log') - def _parse_config(self, config): + def _parse_config(self): """parse send email related options from config file """ - section_name = 'EMAIL SENDER' - key_enabled = 'enabled' - key_interval = 'interval' - - if not config.has_section(section_name): - return - - # enabled - enabled = get_opt_from_conf_or_env(config, section_name, key_enabled, default=True) - enabled = parse_bool(enabled) - self._enabled = enabled - # interval - interval = get_opt_from_conf_or_env(config, section_name, key_interval, default=60 * 60) - interval = parse_interval(interval, 60 * 60) - self._interval = interval + self._enabled = EMAIL_SENDER_ENABLED + self._interval = EMAIL_SENDER_INTERVAL def start(self): if not self.is_enabled(): diff --git a/dtable_events/tasks/instant_notices_sender.py b/dtable_events/tasks/instant_notices_sender.py index a28db800..4c54cda3 100644 --- a/dtable_events/tasks/instant_notices_sender.py +++ b/dtable_events/tasks/instant_notices_sender.py @@ -4,9 +4,9 @@ import logging from threading import Thread, Event -from dtable_events.app.config import ENABLE_WEIXIN, ENABLE_WORK_WEIXIN, ENABLE_DINGTALK, dtable_web_dir -from dtable_events.utils import get_python_executable, parse_bool, \ - parse_interval, get_opt_from_conf_or_env, run +from dtable_events.app.config import ENABLE_WEIXIN, ENABLE_WORK_WEIXIN, ENABLE_DINGTALK, dtable_web_dir, \ + INSTANT_SENDER_INTERVAL +from dtable_events.utils import get_python_executable, parse_bool, run __all__ = [ 'InstantNoticeSender', @@ -15,24 +15,20 @@ class InstantNoticeSender(object): - def __init__(self, config): + def __init__(self): self._enabled = False self._interval = None self._logfile = None - self._parse_config(config) + self._parse_config() self._prepare_logfile() def _prepare_logfile(self): log_dir = os.path.join(os.environ.get('LOG_DIR', '')) self._logfile = os.path.join(log_dir, 'instant_notice_sender.log') - def _parse_config(self, config): + def _parse_config(self): """parse instant related options from config file """ - section_name = 'INSTANT SENDER' - key_interval = 'interval' - default_interval = 60 # 1min - # enabled enabled = ENABLE_WEIXIN or ENABLE_WORK_WEIXIN or ENABLE_DINGTALK enabled = parse_bool(enabled) @@ -40,15 +36,7 @@ def _parse_config(self, config): return self._enabled = True - # notice send interval - if config.has_section(section_name): - interval = get_opt_from_conf_or_env(config, section_name, key_interval, - default=default_interval).lower() - interval = parse_interval(interval, default_interval) - else: - interval = default_interval - - self._interval = interval + self._interval = INSTANT_SENDER_INTERVAL def start(self): if not self.is_enabled(): diff --git a/dtable_events/tasks/ldap_syncer.py b/dtable_events/tasks/ldap_syncer.py index 3fa362bc..f1e03c0f 100644 --- a/dtable_events/tasks/ldap_syncer.py +++ b/dtable_events/tasks/ldap_syncer.py @@ -2,39 +2,26 @@ import logging from threading import Thread, Event -from dtable_events.utils import get_opt_from_conf_or_env, \ - get_python_executable, run_and_wait, parse_bool, parse_interval -from dtable_events.app.config import dtable_web_dir +from dtable_events.utils import get_python_executable, run_and_wait +from dtable_events.app.config import dtable_web_dir, LDAP_SYNC_ENABLED, LDAP_SYNC_INTERVAL class LDAPSyncer(object): - def __init__(self, config): + def __init__(self): self._enabled = False self._logfile = None self._interval = 60 * 60 self._prepare_logfile() - self._prepara_config(config) + self._prepara_config() def _prepare_logfile(self): logdir = os.path.join(os.environ.get('LOG_DIR', '')) self._logfile = os.path.join(logdir, 'ldap_syncer.log') - def _prepara_config(self, config): - section_name = 'LDAP SYNC' - key_enabled = 'enabled' - key_sync_interval = 'sync_interval' - - if not config.has_section(section_name): - section_name = 'LDAP_SYNC' - if not config.has_section(section_name): - return - - # enabled - enabled = get_opt_from_conf_or_env(config, section_name, key_enabled, default=False) - self._enabled = parse_bool(enabled) - interval = get_opt_from_conf_or_env(config, section_name, key_sync_interval, default=60*60) - self._interval = parse_interval(interval, 60*60) + def _prepara_config(self): + self._enabled = LDAP_SYNC_ENABLED + self._interval = LDAP_SYNC_INTERVAL def start(self): if not self.is_enabled(): diff --git a/dtable_events/tasks/metrics.py b/dtable_events/tasks/metrics.py index 66998b6f..f20fa780 100644 --- a/dtable_events/tasks/metrics.py +++ b/dtable_events/tasks/metrics.py @@ -19,10 +19,10 @@ class MetricReceiver(Thread): """ collect metrics from redis channel and save to local """ - def __init__(self, config): + def __init__(self): Thread.__init__(self) self._finished = Event() - self._redis_client = RedisClient(config=config) + self._redis_client = RedisClient() def run(self): if not self._redis_client.connection: logging.warning('Redis connection is not established.') @@ -76,9 +76,6 @@ def time_job(): class MetricManager(object): - def __init__(self, config): - self.config = config - def start(self): logging.info('Start metric manager...') try: @@ -87,7 +84,7 @@ def start(self): except Exception as e: logging.error('Failed to start metric collect thread: %s' % e) try: - self._metric_task = MetricReceiver(self.config) + self._metric_task = MetricReceiver() self._metric_task.start() except Exception as e: logging.error('Failed to start metric handle thread: %s' % e) diff --git a/dtable_events/tasks/universal_app_auto_buckup.py b/dtable_events/tasks/universal_app_auto_buckup.py index d16c4516..313fd995 100644 --- a/dtable_events/tasks/universal_app_auto_buckup.py +++ b/dtable_events/tasks/universal_app_auto_buckup.py @@ -9,7 +9,7 @@ from dtable_events.db import init_db_session_class from dtable_events.app.event_redis import RedisClient from dtable_events.utils import uuid_str_to_36_chars -from dtable_events.app.config import UNIVERSAL_APP_SNAPSHOT_AUTO_SAVE_DAYS, UNIVERSAL_APP_SNAPSHOT_AUTO_SAVE_NOTES +from dtable_events.app.config import UNIVERSAL_APP_SNAPSHOT_AUTO_SAVE_DAYS from sqlalchemy import text from seaserv import seafile_api @@ -17,11 +17,11 @@ logger = logging.getLogger(__name__) class UniversalAppAutoBackup(Thread): - def __init__(self, config): + def __init__(self): Thread.__init__(self) self._finished = Event() - self._db_session_class = init_db_session_class(config) - self._redis_client = RedisClient(config) + self._db_session_class = init_db_session_class() + self._redis_client = RedisClient() self._lock = Lock() def create_snapshot(self, session, app_id, app_version, app_config): @@ -36,7 +36,7 @@ def create_snapshot(self, session, app_id, app_version, app_config): """ result = session.execute(text(cmd), { 'app_id': app_id, - 'notes': UNIVERSAL_APP_SNAPSHOT_AUTO_SAVE_NOTES, + 'notes': 'auto backup', 'app_version': app_version, 'app_config': app_config, 'created_at': int(time.time()), diff --git a/dtable_events/tasks/virus_scanner.py b/dtable_events/tasks/virus_scanner.py index ba41a497..4a3b3112 100644 --- a/dtable_events/tasks/virus_scanner.py +++ b/dtable_events/tasks/virus_scanner.py @@ -7,8 +7,8 @@ class VirusScanner(object): - def __init__(self, config, seafile_config): - self.settings = Settings(config, seafile_config) + def __init__(self): + self.settings = Settings() def is_enabled(self): return self.settings.is_enabled() diff --git a/dtable_events/utils/email_sender.py b/dtable_events/utils/email_sender.py index 5443d88e..ed7da48b 100644 --- a/dtable_events/utils/email_sender.py +++ b/dtable_events/utils/email_sender.py @@ -396,15 +396,14 @@ def _on_sending_email(self, msg_obj): return requests.post(self.EMAIL_SENDING_ENDPOINT, data=msg_base64, headers=headers) class EmailSender: - def __init__(self, account_id, operator, config=None, db_session=None): + def __init__(self, account_id, operator, db_session=None): """ one of config or db_session is required """ self.account_id = account_id self.operator = operator - self.config = config self.is_config_session = False if db_session else True - self.db_session = db_session or init_db_session_class(self.config)() + self.db_session = db_session or init_db_session_class()() self._sender_init() def _sender_init(self): @@ -458,5 +457,5 @@ def close(self): if self.is_config_session: self.db_session.close() -def toggle_send_email(account_id, send_info, username, config): - return EmailSender(account_id, username, config).send(send_info) +def toggle_send_email(account_id, send_info, username): + return EmailSender(account_id, username).send(send_info) diff --git a/dtable_events/virus_scanner/run_virus_scan.py b/dtable_events/virus_scanner/run_virus_scan.py index 6514d790..56e43209 100644 --- a/dtable_events/virus_scanner/run_virus_scan.py +++ b/dtable_events/virus_scanner/run_virus_scan.py @@ -1,9 +1,7 @@ # coding: utf-8 -import os import sys import logging -import argparse from dtable_events.db import prepare_seafile_tables from dtable_events.virus_scanner.scan_settings import Settings @@ -23,27 +21,9 @@ from dtable_events.virus_scanner.scan_settings import logger logger.setLevel(logging.INFO) - parser = argparse.ArgumentParser() - parser.add_argument('-c', '--config-file', - default=os.path.join(os.path.abspath('..'), 'dtable-events.conf'), - help='dtable-events config file') - args = parser.parse_args() - - config = get_config(args.config_file) - seafile_conf_path = '/opt/seafile/conf/seafile.conf' - for conf_dir in [ - os.environ.get('SEAFILE_CENTRAL_CONF_DIR'), - os.environ.get('SEAFILE_DATA_DIR') - ]: - if os.path.isfile(os.path.join(conf_dir, 'seafile.conf')): - seafile_conf_path = os.path.join(conf_dir, 'seafile.conf') - break - - seafile_config = get_config(seafile_conf_path) - - setting = Settings(config, seafile_config) + setting = Settings() if setting.is_enabled(): - prepare_seafile_tables(seafile_config) + prepare_seafile_tables() VirusScan(setting).start() else: logger.info('Virus scan is disabled.') diff --git a/dtable_events/virus_scanner/scan_settings.py b/dtable_events/virus_scanner/scan_settings.py index fcc6ea18..b8914ab8 100644 --- a/dtable_events/virus_scanner/scan_settings.py +++ b/dtable_events/virus_scanner/scan_settings.py @@ -2,6 +2,9 @@ import os import logging +from dtable_events.app.config import VIRUS_SCAN_ENABLED, VIRUS_SCAN_SCAN_COMMAND, VIRUS_SCAN_VIRUS_CODE, \ + VIRUS_SCAN_NONVIRUS_CODE, VIRUS_SCAN_SCAN_INTERVAL, VIRUS_SCAN_SCAN_SIZE_LIMIT, VIRUS_SCAN_SCAN_SKIP_EXT, \ + VIRUS_SCAN_THREADS, EMAIL_SENDER_ENABLED from dtable_events.utils import get_opt_from_conf_or_env, parse_bool from dtable_events.db import init_db_session_class, init_seafile_db_session_class @@ -10,7 +13,7 @@ class Settings(object): - def __init__(self, config, seafile_config): + def __init__(self): self.enable_scan = False self.scan_cmd = None self.vir_codes = None @@ -27,42 +30,35 @@ def __init__(self, config, seafile_config): self.session_cls = None self.seaf_session_cls = None - self.parse_config(config, seafile_config) + self.parse_config() - def parse_config(self, config, seafile_config): - if not self.parse_scan_config(config): + def parse_config(self): + if not self.parse_scan_config(): return try: - self.session_cls = init_db_session_class(config) - self.seaf_session_cls = init_seafile_db_session_class(seafile_config) + self.session_cls = init_db_session_class() + self.seaf_session_cls = init_seafile_db_session_class() except Exception as e: logger.warning('Failed to init db session class: %s', e) return - section_name = 'VIRUS SCAN' - self.enable_scan = config.getboolean(section_name, 'enabled', fallback=False) + self.enable_scan = VIRUS_SCAN_ENABLED - self.parse_send_mail_config(config) + self.parse_send_mail_config() - def parse_scan_config(self, config): - section_name = 'VIRUS SCAN' - if config.has_option(section_name, 'scan_command'): - self.scan_cmd = config.get(section_name, 'scan_command') + def parse_scan_config(self): + self.scan_cmd = VIRUS_SCAN_SCAN_COMMAND if not self.scan_cmd: - logger.info(f'[{section_name}] scan_command option is not found in seafile.conf, disable virus scan.') + logger.info(f'scan_command option is not found in seafile.conf, disable virus scan.') return False - vcode = None - if config.has_option(section_name, 'virus_code'): - vcode = config.get(section_name, 'virus_code') + vcode = VIRUS_SCAN_VIRUS_CODE if not vcode: logger.info('virus_code is not set, disable virus scan.') return False - nvcode = None - if config.has_option(section_name, 'nonvirus_code'): - nvcode = config.get(section_name, 'nonvirus_code') + nvcode = VIRUS_SCAN_NONVIRUS_CODE if not nvcode: logger.info('nonvirus_code is not set, disable virus scan.') return False @@ -79,47 +75,24 @@ def parse_scan_config(self, config): logger.info('invalid nonvirus_code format, disable virus scan.') return False - if config.has_option(section_name, 'scan_interval'): - try: - self.scan_interval = config.getint(section_name, 'scan_interval') - except ValueError: - pass - - if config.has_option(section_name, 'scan_size_limit'): - try: - # in M unit - self.scan_size_limit = config.getint(section_name, 'scan_size_limit') - except ValueError: - pass - - if config.has_option(section_name, 'scan_skip_ext'): - exts = config.get(section_name, 'scan_skip_ext').split(',') + self.scan_interval = VIRUS_SCAN_SCAN_INTERVAL + + self.scan_size_limit = VIRUS_SCAN_SCAN_SIZE_LIMIT + + if VIRUS_SCAN_SCAN_SKIP_EXT: + exts = VIRUS_SCAN_SCAN_SKIP_EXT.split(',') # .jpg, .mp3, .mp4 format exts = [ext.strip() for ext in exts if ext] self.scan_skip_ext = [ext.lower() for ext in exts if len(ext) > 1 and ext[0] == '.'] - if config.has_option(section_name, 'threads'): - try: - self.threads = config.getint(section_name, 'threads') - except ValueError: - pass + if VIRUS_SCAN_THREADS: + self.threads = VIRUS_SCAN_THREADS return True - def parse_send_mail_config(self, config): - section_name = 'EMAIL SENDER' - key_enabled = 'enabled' - - if not config.has_section(section_name): - return - - enabled = get_opt_from_conf_or_env(config, section_name, key_enabled, default=True) - enabled = parse_bool(enabled) - if not enabled: - return - - self.enable_send_mail = True + def parse_send_mail_config(self): + self.enable_send_mail = EMAIL_SENDER_ENABLED def is_enabled(self): return self.enable_scan diff --git a/dtable_events/webhook/webhook.py b/dtable_events/webhook/webhook.py index dbdf87bc..4bc2f1e1 100644 --- a/dtable_events/webhook/webhook.py +++ b/dtable_events/webhook/webhook.py @@ -26,9 +26,9 @@ class Webhooker(object): 2. query webhooks and generate jobs, then put them to queue. 3. trigger jobs one by one. """ - def __init__(self, config): - self._db_session_class = init_db_session_class(config) - self._redis_client = RedisClient(config) + def __init__(self): + self._db_session_class = init_db_session_class() + self._redis_client = RedisClient() self._subscriber = self._redis_client.get_subscriber('table-events') self.job_queue = Queue() diff --git a/dtable_events/workflow/workflow_actions.py b/dtable_events/workflow/workflow_actions.py index dcf57cf3..199295d7 100644 --- a/dtable_events/workflow/workflow_actions.py +++ b/dtable_events/workflow/workflow_actions.py @@ -163,11 +163,11 @@ def do_workflow_actions(task_id, node_id, db_session): class WorkflowActionsHandler(Thread): - def __init__(self, config): + def __init__(self): Thread.__init__(self) self._finished = Event() - self._db_session_class = init_db_session_class(config) - self._redis_client = RedisClient(config) + self._db_session_class = init_db_session_class() + self._redis_client = RedisClient() def run(self): logger.info('Starting handle workflow actions...') diff --git a/dtable_events/workflow/workflow_schedules_scanner.py b/dtable_events/workflow/workflow_schedules_scanner.py index a3a0b297..56cf4a49 100644 --- a/dtable_events/workflow/workflow_schedules_scanner.py +++ b/dtable_events/workflow/workflow_schedules_scanner.py @@ -6,7 +6,7 @@ from sqlalchemy import text from apscheduler.schedulers.blocking import BlockingScheduler -from dtable_events.app.config import DTABLE_WEB_SERVICE_URL +from dtable_events.app.config import DTABLE_WEB_SERVICE_URL, WORKFLOW_SCANNER_ENABLED from dtable_events.db import init_db_session_class from dtable_events.utils import get_opt_from_conf_or_env, parse_bool from dtable_events.utils.dtable_web_api import DTableWebAPI @@ -14,24 +14,13 @@ class WorkflowSchedulesScanner: - def __init__(self, config): + def __init__(self): self._enabled = True - self._parse_config(config) - self._db_session_class = init_db_session_class(config) - - def _parse_config(self, config): - section_name = 'WORKFLOW SCANNER' - key_enabled = 'enabled' - - if not config.has_section(section_name): - section_name = 'WORKFLOW-SCANNER' - if not config.has_section(section_name): - return - - # enabled - enabled = get_opt_from_conf_or_env(config, section_name, key_enabled, default=True) - enabled = parse_bool(enabled) - self._enabled = enabled + self._parse_config() + self._db_session_class = init_db_session_class() + + def _parse_config(self): + self._enabled = WORKFLOW_SCANNER_ENABLED def start(self): if not self._enabled: diff --git a/requirements.txt b/requirements.txt index 5cf96f64..db7da657 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,3 +13,4 @@ pdfminer.six==20221105 pdf2image~=1.17.0 playwright==1.48.0 psutil==6.1.0 +pyyaml==6.0.*