diff --git a/freqtrade/persistence/binanceannouncements.py b/freqtrade/persistence/binanceannouncements.py index a721649a3..e872812f6 100644 --- a/freqtrade/persistence/binanceannouncements.py +++ b/freqtrade/persistence/binanceannouncements.py @@ -6,8 +6,6 @@ from sqlalchemy import create_engine from sqlalchemy.exc import ProgrammingError from sqlalchemy.types import DateTime -TABLE = "binanceannouncements" - def get_engine(uri: str): return create_engine(uri, pool_recycle=3600) @@ -17,12 +15,12 @@ def get_connection(uri: str): return get_engine(uri).connect() -def get_df(uri): +def get_df(uri, table_name): """Get dataframe and the first time create DB.""" connection = get_connection(uri) try: return pd.read_sql_table( - table_name=TABLE, + table_name=table_name, con=connection, index_col='index', columns=['Token', 'Text', 'Link', 'Datetime discover', 'Datetime announcement'], @@ -33,12 +31,12 @@ def get_df(uri): connection.close() -def save_df(df, uri): +def save_df(df, uri, table_name): """Save dataframe on DB.""" connection = get_connection(uri) try: df.to_sql( - name=TABLE, + name=table_name, con=connection, index=True, index_label='index', diff --git a/freqtrade/plugins/pairlist/AnnouncementsPairList.py b/freqtrade/plugins/pairlist/AnnouncementsPairList.py index 51dc20805..926322ec3 100644 --- a/freqtrade/plugins/pairlist/AnnouncementsPairList.py +++ b/freqtrade/plugins/pairlist/AnnouncementsPairList.py @@ -78,13 +78,14 @@ class BinanceAnnouncement(AnnouncementMixin): TOKEN_COL = 'Token' ANNOUNCEMENT_COL = 'Datetime announcement' + table_name = 'binanceannouncements' _df: Optional[pd.DataFrame] = None def __init__(self, refresh_period: Optional[int] = None, *args, **kwargs): - self.db_path = kwargs.get('db_path', "user_data/data/BinanceAnnouncements_announcements.csv") - self._refresh_period = refresh_period or self.REFRESH_PERIOD + self.db_path = kwargs.pop('db_path', "user_data/data/BinanceAnnouncements_announcements.csv") + super().__init__(refresh_period, *args, **kwargs) - def update_announcements(self, page_number=1, page_size=10, history=False) -> pd.DataFrame: + def update_announcements(self, page_number=1, page_size=10, max_page=100, history=False) -> pd.DataFrame: headers = { "Cache-Control": "max-age=0", } @@ -109,7 +110,7 @@ class BinanceAnnouncement(AnnouncementMixin): if not history: while 'Age' in (response.headers or {}): try: - url = self.get_api_url(random.randint(1, 100), page_size) + url = self.get_api_url(random.randint(1, max_page), page_size) response = get(url, headers=headers) except Exception: break @@ -212,12 +213,12 @@ class BinanceAnnouncement(AnnouncementMixin): def load_db(self): try: - self._df = binanceannouncements.get_df(self.db_path) + self._df = binanceannouncements.get_df(self.db_path, self.table_name) except binanceannouncements.ProgrammingError: pass def save_db(self): - binanceannouncements.save_df(self._df, self.db_path) + binanceannouncements.save_df(self._df, self.db_path, self.table_name) def _save_df(self, df: pd.DataFrame): self._df = df.sort_values(by='Datetime announcement') @@ -255,6 +256,168 @@ class BinanceAnnouncement(AnnouncementMixin): return self.BINANCE_API_URL.format(self.BINANCE_CATALOG_ID, page_number, page_size) +class KucoinAnnouncement(AnnouncementMixin): + + CATALOG = 'listing' + BASE_URL = "https://www.kucoin.com/_api/cms/articles" + ANNOUNCEMENT_URL = 'https://www.kucoin.com/news/' + API_QUERY = "?page={}&pageSize={}&category={}&c=d3a4f4dcc33844fbbabc3f9cc0abb3cf&lang=en_US" + API_URL = BASE_URL + API_QUERY + + # token info + TOKEN_REGEX = re.compile(r'\((\w+)\)') + + # 'opens trading', 'defi', 'uniswap', 'airdrop' + KEY_WORDS_BLACKLIST = ['listing postponed', 'futures', 'leveraged'] + + REFRESH_PERIOD = 3 + + # storage + COLS = ['Token', 'Text', 'Link', 'Datetime discover', 'Datetime announcement'] + + TOKEN_COL = 'Token' + ANNOUNCEMENT_COL = 'Datetime announcement' + + table_name = 'kucoinannouncements' + + _df: Optional[pd.DataFrame] = None + + def __init__(self, refresh_period: Optional[int] = None, *args, **kwargs): + self.db_path = kwargs.get('db_path', "user_data/data/KucoinAnnouncements_announcements.csv") + self._refresh_period = refresh_period or self.REFRESH_PERIOD + super().__init__(*args, **kwargs) + + def update_announcements(self, page_number=1, page_size=10, max_page=100, history=False) -> pd.DataFrame: + headers = { + "Cache-Control": "max-age=0", + } + response = None + url = self.get_api_url(page_number, page_size) + + if history: + # recursive updating + return [self.update_announcements( + page, page_size, history=False + ) for page in reversed(range(2, 56))][-1] + + try: + now = datetime.now(tz=pytz.utc) + df = self.get_df() + + try: + response = get(url, headers=headers) + except Exception as e: + raise TemporaryError(f"Kucoin url ({url}) is not available. Original Exception: {e}") + + if response.status_code != 200: + raise TemporaryError(f"Invalid response from url: {url}.\n" + f"Status code: {response.status_code}\n" + f"Content: {response.content.decode()}") + + logger.info("Updating from Kucoin...") + updated_list = [] + articles = response.json()['items'] or [] + for article in articles: + article_link = self.get_announcement_url(article['path']) + article_text = article['title'] + + tokens = self._get_tokens(article_text) + + for token in tokens: + if token: + updated_list.extend( + self._get_new_data( + data=article, + now=now, + token=token, + article_text_lower=article_text.lower(), + article_link=article_link, + article_text=article_text, + df=df + ) + ) + + if df is not None: + df = df.append(pd.DataFrame(updated_list, columns=self.COLS), ignore_index=True) + else: + df = pd.DataFrame(updated_list, columns=self.COLS) + + if updated_list: + logger.info(f"Adding tokens to database: {[upd[0] for upd in updated_list]}") + self._save_df(df) + return df + + except TemporaryError as e: + # exception handled, re-raise + logger.error(e) + raise e + + except Exception as e: + # exception not handled raise OperationalException + logger.error(e) + raise OperationalException(f"Some errors occurred processing KUCOIN data. " + f"Url: {url}.\n" + f"Status code: {response.status_code if response else None}\n" + f"Content: {response.content.decode() if response else None}\n" + f"Exception: {e}") + + def _get_new_data(self, data, now, token, article_text_lower, article_link, article_text, df=None): + updated_list = [] + conditions_buy = ( + df is None # is first time data + or not (token is None or token in df[self.TOKEN_COL].values) # not an existing or null token + ) + if conditions_buy: + if any(i in article_text_lower for i in self.KEY_WORDS_BLACKLIST): + logger.debug(f'BLACKLISTED: "{article_text}", skip.') + return updated_list + + if token: + logger.info(f'Found new announcement: "{article_text}". Token: {token}.') + updated_list.append( + [token, article_text, article_link, now, self.get_datetime_announcement(data)] + ) + return updated_list + + def _get_tokens(self, text: str): + return self.TOKEN_REGEX.findall(text) + + def get_df(self): + """Get Dataframe from CSV file or from Database""" + if self._df is None: + self.load_csv_db() if self.db_path.endswith('csv') else self.load_db() + return self._df + + def load_csv_db(self): + try: + self._df = pd.read_csv(self.db_path, parse_dates=['Datetime discover', 'Datetime announcement']) + except FileNotFoundError: + pass + + def load_db(self): + try: + self._df = binanceannouncements.get_df(self.db_path, self.table_name) + except binanceannouncements.ProgrammingError: + pass + + def save_db(self): + binanceannouncements.save_df(self._df, self.db_path, self.table_name) + + def _save_df(self, df: pd.DataFrame): + self._df = df.sort_values(by='Datetime announcement') + self._df.to_csv(self.db_path, index=False) if self.db_path.endswith('csv') else self.save_db() + + def get_datetime_announcement(self, data: dict): + return datetime.strptime(data['publish_at'], "YYYY-mm-dd HH:MM:SS") + + def get_announcement_url(self, path: str) -> str: + # https://www.kucoin.com/news/en-aragon-ant-gets-listed-on-kucoin + return "".join([self.ANNOUNCEMENT_URL, path]) + + def get_api_url(self, page_number: int = 1, page_size: int = 10) -> str: + return self.API_URL.format(page_number, page_size, self.CATALOG) + + class AnnouncementsPairList(IPairList): """ Announcement pair list. @@ -352,21 +515,20 @@ class AnnouncementsPairList(IPairList): return [f"{token}/{self._stake_currency}" for token in df[self.pair_exchange.TOKEN_COL]] def _init_pair_exchange(self, config, **pair_exchange_kwargs): - exchange = None + pair_exchange_kwargs['db_path'] = config.get('ba_database_uri', None) + pair_exchange_kwargs['refresh_period'] = self._refresh_period + if self._pair_exchange == 'binance': - exchange = BinanceAnnouncement( - db_path=config.get('ba_database_uri', None), - refresh_period=self._refresh_period, - **pair_exchange_kwargs - ) + exchange = BinanceAnnouncement(**pair_exchange_kwargs) + elif self._pair_exchange == 'kucoin': + exchange = KucoinAnnouncement(**pair_exchange_kwargs) + else: + raise OperationalException(f'Exchange `{self._pair_exchange}` is not supported yet') - if exchange: - assert hasattr(exchange, 'update_announcements'), '`update_announcements` method is required' - assert hasattr(exchange, 'TOKEN_COL'), '`TOKEN_COL` attribute is required' - assert hasattr(exchange, 'ANNOUNCEMENT_COL'), '`ANNOUNCEMENT_COL` attribute is required' - return exchange - - raise OperationalException(f'Exchange `{self._pair_exchange}` is not supported yet') + assert hasattr(exchange, 'update_announcements'), '`update_announcements` method is required' + assert hasattr(exchange, 'TOKEN_COL'), '`TOKEN_COL` attribute is required' + assert hasattr(exchange, 'ANNOUNCEMENT_COL'), '`ANNOUNCEMENT_COL` attribute is required' + return exchange def _validate_pair(self, pair: str, ticker: Dict[str, Any]) -> bool: return NotImplemented