浏览代码

no message

libushang 16 小时之前
父节点
当前提交
8ef7c4b46f

+ 1 - 1
jobs/__init__.py

@@ -20,7 +20,7 @@ def register_jobs(scheduler: BaseScheduler):
     scheduler.add_job(yzy_proc, next_run_time=(datetime.now() + timedelta(seconds=3)))
     scheduler.add_job(yzy_proc, CronTrigger.from_crontab('0 */5 * * *'))
     
-    scheduler.add_job(yzy_msg_queue_proc, CronTrigger.from_crontab('* * * * *'))
+    scheduler.add_job(yzy_msg_queue_proc, CronTrigger.from_crontab('* * * * *'), max_instances=1)
 
     # 粤政易组织架构同步
     scheduler.add_job(yzy_unit_queue_proc, next_run_time=(datetime.now() + timedelta(seconds=3)))

+ 68 - 0
jobs/yzy_job.py

@@ -3,6 +3,7 @@
 from datetime import datetime
 from sqlalchemy.sql import func
 from sqlalchemy.orm import Session
+from sqlalchemy import text, exists, and_, or_, not_
 from utils import *
 from utils.redis_util import *
 from models import *
@@ -10,6 +11,7 @@ from exceptions import *
 from database import get_local_db, get_share_db
 from extensions import logger
 from common import YzyApi
+from common.db import db_dict, db_msg_center
 from config import settings
 import traceback
 
@@ -56,6 +58,9 @@ def proc():
 
 def yzy_msg_queue_proc():
     print('yzy_msg_queue_proc ')
+
+    check_infopublic_msg()
+
     if settings.IS_PROD == False:
         return
     
@@ -81,6 +86,9 @@ def yzy_msg_queue_proc():
                         row.sent_status = 9
                         row.errmsg = resp['errmsg']
                     db.commit()
+
+                    handle_after_yzy_msg_sent(db, row)
+
                 except Exception as e:
                     traceback.print_exc()
 
@@ -114,6 +122,66 @@ def yzy_msg_queue_proc():
             redis_unlock(lock_key)
         print('yzy_msg_queue_proc_sk end.')
 
+def handle_after_yzy_msg_sent(db: Session, row: YzyMsgQueue) -> None:
+    if row.from_scenario == 'infopublish_responses':
+        response_id = row.foreign_key
+        if row.sent_status == 1:
+            db.query(InfoPublishResponses).filter(InfoPublishResponses.id == response_id).update({"sent_status": 1, "sent_time": row.sent_time})
+            db.commit()
+
+        elif row.sent_status == 9:
+            db.query(InfoPublishResponses).filter(InfoPublishResponses.id == response_id).update({"sent_status": 2, "sent_time": row.sent_time})
+            db.commit()
+    else:
+        ...
+
+
+# 检查是否有待发送【消息发布】的内容
+def check_infopublic_msg():
+    with get_local_db() as db:
+        infopublish_list = db.query(InfoPublishBase).filter(and_(InfoPublishBase.publish_status == 3, InfoPublishBase.del_flag == '0')).all()
+        logger.info("infopublish_list: {}", len(infopublish_list))
+        for infopublish_base in infopublish_list:
+            info_id = infopublish_base.id
+            info_type_text = db_dict.get_dict_label(db, "mm_info_type", infopublish_base.info_type)
+            system_from = infopublish_base.system_from
+
+            # 发送粤政易消息
+            rows = db.query(InfoPublishResponses).filter(and_(InfoPublishResponses.publish_id == info_id, InfoPublishResponses.sent_status == 0)).all()
+            for row in rows:
+                response_id = row.id
+                user_id = row.user_id
+                yzy_account = row.yzy_account
+
+                detail_url = f"{settings.YZY_WEB_ROOT}/yjxp/#/infoDetails?id={info_id}"
+                description = "消息类型: " + info_type_text + "\n发布时间: " + get_datetime_str(infopublish_base.publish_time) + "\n消息内容: " + infopublish_base.content
+                
+                logger.info("{}, {}", yzy_account, description)
+                data = {
+                    "yzy_userid": yzy_account,
+                    "mobile": '-',
+                    "content": description,
+                    "recorded_by": user_id,
+                    "detail_url": detail_url,
+                    "foreign_key": str(response_id),
+                    "from_scenario": "infopublish_responses",
+                    "title": info_type_text
+                }
+                YzyApi.add_to_msg_queue(db, data)
+
+                if system_from == "防御指令":
+                    msg_title = "防御指令"
+                    msg_description = f"你有{info_type_text}消息,请及时处理"
+                else:
+                    msg_title = info_type_text
+                    msg_description = f"你有{info_type_text}消息,请及时处理"
+                db_msg_center.add_message(db, info_type_text, user_id, msg_title, msg_description, str(info_id), 'infopublish_base')
+
+            # 发送完毕
+            infopublish_base.publish_status = 4
+            db.commit()
+
+
 def yzy_unit_queue_proc():
     if settings.IS_PROD == False:
         return

+ 1 - 0
models/xxfb_base.py

@@ -32,6 +32,7 @@ class InfoPublishBase(Base):
     user_err_count = Column(Integer, default='0', server_default='0', comment='失败人数')
     user_sending_count = Column(Integer, default='0', server_default='0', comment='发送中人数')
     info_type = Column(String, default='0', comment='消息类型 0 预警信息 1 灾情信息 2 灾情信息 3 指挥救援 4 公众防范')
+    system_from = Column(String, default='', server_default='', comment='发布源头')
 
     class Config:
         orm_mode = True

+ 82 - 4
routers/api/infoPublish/back.py

@@ -19,7 +19,7 @@ import traceback
 from utils import *
 from datetime import datetime, timedelta
 from common import YzyApi
-from common.db import db_dict
+from common.db import db_dict, db_msg_center
 from urllib.parse import quote
 import base64
 from config import settings
@@ -71,6 +71,9 @@ async def create_emergency_plan(
         
         examine_by = examine_user_row.user_id
 
+        
+        system_from = get_req_param_optional(body, 'system_from')
+
         new_publish = InfoPublishBase(
             title = body['title'],
             publish_group = body['publish_group'],
@@ -91,7 +94,8 @@ async def create_emergency_plan(
             user_ok_count = 0,
             user_err_count = 0,
             user_sending_count = 0,
-            info_type = body['info_type']
+            info_type = body['info_type'],
+            system_from = system_from
         )
         db.add(new_publish)
         db.commit()
@@ -111,7 +115,7 @@ async def create_emergency_plan(
                 send_user_name = mpfun.dec_data(user_row.user_name)
                 send_dept_row = db.query(SysDept).filter(SysDept.dept_id == user_row.dept_id).first()
                 send_dept_name = send_dept_row.dept_name
-                send_yzy_account = mpfun.dec_data(user_row.yzy_account)
+                send_yzy_account = (user_row.yzy_account)
 
                 if send_yzy_account is None or send_yzy_account == "":
                     send_yzy_account = user_row.phonenumber
@@ -226,6 +230,7 @@ async def get_publish_list(
     publish_status: str = Query('', description='发布状态的字典键值'),
     examine_status: str = Query('', description='审批状态的字典键值'),
     dispose_status: str = Query('', description='处理状态的字典键值'),
+    system_from: str = Query('', description='信息来源'),
     content: str = Query('', description='信息内容'),
     sort_by: str = Query('', description='排序字段'),
     sort_order: str = Query("asc", description='排序方式'),
@@ -245,6 +250,8 @@ async def get_publish_list(
             where = and_(where, InfoPublishBase.examine_status == examine_status)
         if publish_group != '':
             where = and_(where, InfoPublishBase.publish_group.like('%{}%'.format(publish_group)))
+        if system_from != '':
+            where = and_(where, InfoPublishBase.system_from == system_from)
 
         if dispose_status == '1' : # 1 待处理
             # 审核类型 20 待审批
@@ -625,4 +632,75 @@ def get_response_type_text(val: int) -> str:
     elif val == 2:
         return '签字确认'
     else:
-        return '未知'
+        return '未知'
+    
+
+@router.get('/find_examine_user')
+async def userlist( deptId: int = Query(None ,description='部门id'),
+                    userName: str = Query(None, description='用户名'),
+                    status: int = Query(None, description='用户状态'),
+                    phonenumber : str = Query(None, description='手机号'),
+                    page: int = Query(1, gt=0, description='页码'),
+                    pageSize: int = Query(10, gt=0, description='每页条目数量'),
+                    db: Session = Depends(get_db),
+                    user_id: int = Depends(valid_access_token)):
+    try:
+        # 构建查询
+        query = db.query(SysUser)
+        query = query.filter(SysUser.del_flag != '2')
+        
+        # 应用查询条件
+        if userName:
+            query =query.filter(or_(SysUser.user_name == mpfun.enc_data(userName), SysUser.nick_name.like(f'%{userName}%')))
+        
+        if status:
+            query =query.filter(SysUser.status == status)
+        
+        if phonenumber:
+            query =query.filter(SysUser.phonenumber == mpfun.enc_data(phonenumber))
+
+        # 计算总条目数
+        total_items = query.count()
+
+        # 排序
+        query = query.order_by(SysUser.create_time.desc())
+        # 执行分页查询
+        users = query.offset((page - 1) * pageSize).limit(pageSize).all()
+
+        # 将查询结果转换为列表形式的字典
+        user_list = []
+        for user in users:
+            user_info = {
+                "userId": user.user_id,
+                "tenantId": user.tenant_id,
+                "deptId": user.dept_id,
+                "userName": mpfun.dec_data(user.user_name),
+                "nickName": user.nick_name,
+                "userType": user.user_type,
+                "email": mpfun.dec_data(user.email),
+                "phonenumber": mpfun.dec_data(user.phonenumber),
+                "sex": user.sex,
+                "avatar": user.avatar,
+                "status": user.status,
+                "loginIp": user.login_ip,
+                "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '',
+                "remark": user.remark,
+                "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '',
+                "deptName": user.dept_name,
+            }
+            user_list.append(user_info)
+
+        # 返回结果
+        return {
+            "code": 200,
+            "msg": "成功用户列表",
+            "rows": user_list,
+            "total": total_items,
+            "page": page,
+            "pageSize": pageSize,
+            "totalPages": (total_items + pageSize - 1) // pageSize
+        }
+
+    except Exception as e:
+        traceback.print_exc()
+        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")

+ 1 - 1
routers/api/resourceProvison/MaterialReserveManagement/transfer.py

@@ -127,7 +127,7 @@ def send_yzy_msg(db: Session, msg_type: str, to_user_id: int, yzy_account: str,
     }
     YzyApi.add_to_msg_queue(db, data)
 
-    db_msg_center.add_message(db, msg_type, to_user_id, "物资调配审批", description, foreign_key, from_scenario)
+    db_msg_center.add_message(db, msg_type, to_user_id, "物资调配", description, foreign_key, from_scenario)
 
 @router.get("/list")
 async def get_transfter_list(